From 5f4321272994fb6480250c3a715b5eea13535482 Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Mon, 25 Mar 2024 14:49:25 +0000 Subject: [PATCH] WIP lisa.trace: Add PerfettoTraceCollector FEATURE --- lisa/trace.py | 167 ++++++++++++++++++++++++++++++++++++++++++++++++++ setup.py | 3 + 2 files changed, 170 insertions(+) diff --git a/lisa/trace.py b/lisa/trace.py index 801d9c602..bb792ac7a 100644 --- a/lisa/trace.py +++ b/lisa/trace.py @@ -53,6 +53,9 @@ import warnings from concurrent.futures import ThreadPoolExecutor from urllib.parse import urlparse import types +import stat +import urllib.request +import urllib.parse import numpy as np import pandas as pd @@ -798,6 +801,168 @@ class MockTraceParser(TraceParserBase): super().get_metadata(key=key) +class PerfettoTraceParser(TraceParserBase): + """ + Bridge to parsing Perfetto traces. + + :param path: Path to the Perfetto trace. + :type path: str + + :param events: Events to parse eagerly. This is currently unused. + :param events: collections.abc.Iterable(str) + + :param trace_processor_path: Path or URL to Perfetto's ``trace_processor`` + binary. If a URL is passed, it will be downloaded and cached + automatically. + :type trace_processor_path: str + + :Variable keyword arguments: Forwarded to :class:`TraceParserBase` + + """ + @kwargs_forwarded_to(TraceParserBase.__init__) + def __init__(self, path=None, events=None, trace_processor_path='https://get.perfetto.dev/trace_processor', needed_metadata=None, **kwargs): + + if urllib.parse.urlparse(trace_processor_path).scheme: + bin_path = self._download_trace_processor(url=trace_processor_path) + else: + bin_path = trace_processor_path + + self._bin_path = bin_path + self._trace_path = path + + meta = {} + needed = needed_metadata or set() + + if 'available-events' in needed: + meta['available-events'] = set( + row.name + for row in self._query("SELECT DISTINCT name FROM raw") + ) + + if 'time-range' in needed: + time_range, = self._query("SELECT MIN(ts), MAX(ts) FROM raw") + meta['time-range'] = ( + time_range.__dict__['MIN(ts)'] / 1e9, + time_range.__dict__['MAX(ts)'] / 1e9, + ) + + self._metadata = meta + super().__init__(events=events, needed_metadata=needed_metadata, **kwargs) + + @property + @memoized + def _tp(self): + from perfetto.trace_processor import TraceProcessor, TraceProcessorConfig + + config = TraceProcessorConfig( + # Without that, perfetto will disallow querying for most events in + # the raw table + ingest_ftrace_in_raw=True, + bin_path=self._bin_path, + ) + tp = TraceProcessor(trace=self._trace_path, config=config) + return tp + + @classmethod + def _download_trace_processor(cls, url): + def populate(key, path): + url, = key + dst = path / 'trace_processor' + cls.get_logger().info(f"Downloading Perfetto's trace_processor at {dst}") + urllib.request.urlretrieve(url, dst) + os.chmod(dst, dst.stat().st_mode | stat.S_IEXEC) + + dir_cache = DirCache( + category='perfetto_trace_processor', + populate=populate, + ) + + cache_path = dir_cache.get_entry([url]) + return cache_path / 'trace_processor' + + def _query(self, query): + return self._tp.query(query) + + def _df_event(self, event): + # List the fields of the event so we know their types and can query them + query = f"SELECT arg_set_id FROM raw WHERE name = '{event}' LIMIT 1" + arg_set_id, = map(attrgetter('arg_set_id'), self._query(query)) + + query = f"SELECT * FROM args WHERE arg_set_id = {arg_set_id}" + arg_fields = [ + (field.key, field.value_type) + for field in self._query(query) + ] + + def pick_col(typ): + return { + 'uint': 'int_value', + 'int': 'int_value', + 'string': 'string_value', + }[typ] + + arg_cols = [ + f"(SELECT {pick_col(typ)} FROM args WHERE key = '{key}' AND args.arg_set_id = raw.arg_set_id)" + for (key, typ) in arg_fields + ] + common_cols = ['ts as Time', 'cpu as __cpu', 'utid as __pid', '(SELECT name from thread where thread.utid = raw.utid) as __comm'] + + def translate_type(typ): + return { + 'uint': pl.UInt64, + 'int': pl.Int64, + 'string': pl.Categorical(), + }[typ] + + schema = { + 'Time': pl.UInt64, + '__cpu': pl.UInt32, + '__pid': pl.UInt32, + '__comm': pl.Categorical, + **{ + query: translate_type(typ) + for (_, typ), query in zip(arg_fields, arg_cols) + } + } + + extract = ', '.join(common_cols + arg_cols) + query = f"SELECT {extract} FROM raw WHERE name = '{event}'" + + df = pl.LazyFrame( + ( + row.__dict__ + for row in self._query(query) + ), + orient='row', + schema=schema, + ) + df = df.rename({ + query: name + for (name, _), query in zip(arg_fields, arg_cols) + }) + df = df.select(order_as(df.columns, ['Time', '__cpu', '__pid', '__comm'])) + + # Collect the data to expose to the caller that everything sits in + # memory + df = df.collect() + return df + + def parse_event(self, event): + try: + return self._df_event(event) + except Exception as e: + raise MissingTraceEventError( + [event], + available_events=self._metadata.get('available-events'), + ) from e + + def get_metadata(self, key): + try: + return self._metadata[key] + except KeyError: + return super().get_metadata(key=key) + + class TraceDumpError(Exception): """ Exception containing errors forwarded from the trace-dump parser @@ -5264,6 +5429,8 @@ class _Trace(Loggable, _InternalTraceBase): parser = SysTraceParser.from_html elif extension == '.txt': parser = HRTxtTraceParser.from_txt_file + elif extension == '.perfetto-trace': + parser = PerfettoTraceParser else: parser = TraceDumpTraceParser.from_dat self._parser = parser diff --git a/setup.py b/setup.py index bc67560ce..5d60bd1e6 100755 --- a/setup.py +++ b/setup.py @@ -172,6 +172,9 @@ if __name__ == "__main__": "cffi", # unshare syscall "typeguard", + + # For lisa.trace.PerfettoTraceParser + "perfetto", ], extras_require=extras_require, -- GitLab