diff --git a/lisa/trace.py b/lisa/trace.py index d5bc53c64c10923da5cfbb67ce81aaad98aae74e..63ad022da7216f688a79c9908e6996247e3e7a94 100644 --- a/lisa/trace.py +++ b/lisa/trace.py @@ -28,6 +28,7 @@ import copy import io import os import os.path +import stat import json import inspect import shlex @@ -46,6 +47,8 @@ import functools import fnmatch import typing from difflib import get_close_matches +import urllib.request +import urllib.parse import numpy as np import pandas as pd @@ -55,9 +58,9 @@ import pyarrow.parquet import devlib -from lisa.utils import Loggable, HideExekallID, memoized, lru_memoized, deduplicate, take, deprecate, nullcontext, measure_time, checksum, newtype, groupby, PartialInit, kwargs_forwarded_to, kwargs_dispatcher, ComposedContextManager, get_nested_key, bothmethod +from lisa.utils import Loggable, HideExekallID, memoized, lru_memoized, deduplicate, take, deprecate, nullcontext, measure_time, checksum, newtype, groupby, PartialInit, kwargs_forwarded_to, kwargs_dispatcher, ComposedContextManager, get_nested_key, bothmethod, DirCache from lisa.conf import SimpleMultiSrcConf, LevelKeyDesc, KeyDesc, TopLevelKeyDesc, Configurable -from lisa.datautils import SignalDesc, df_add_delta, df_deduplicate, df_window, df_window_signals, series_convert +from lisa.datautils import SignalDesc, df_add_delta, df_deduplicate, df_window, df_window_signals, series_convert, df_update_duplicates from lisa.version import VERSION_TOKEN from lisa._typeclass import FromString from lisa._kmod import LISAFtraceDynamicKmod @@ -406,6 +409,162 @@ class MockTraceParser(TraceParserBase): super().get_metadata(key=key) +class PerfettoTraceParser(TraceParserBase): + """ + Bridge to parsing Perfetto traces. + + :param path: Path to the Perfetto trace. + :type path: str + + :param events: Events to parse eagerly. This is currently unused. + :param events: collections.abc.Iterable(str) + + :param trace_processor_path: Path or URL to Perfetto's ``trace_processor`` + binary. If a URL is passed, it will be downloaded and cached + automatically. + :type trace_processor_path: str + + :Variable keyword arguments: Forwarded to :class:`TraceParserBase` + + """ + @kwargs_forwarded_to(TraceParserBase.__init__) + def __init__(self, path=None, events=None, trace_processor_path='https://get.perfetto.dev/trace_processor', **kwargs): + + if urllib.parse.urlparse(trace_processor_path).scheme: + bin_path = self._download_trace_processor(url=trace_processor_path) + else: + bin_path = trace_processor_path + + self._bin_path = bin_path + tp = self._make_tp(path) + + time_range, = tp.query("SELECT MIN(ts), MAX(ts) FROM raw") + time_range = ( + time_range.__dict__['MIN(ts)'] / 1e9, + time_range.__dict__['MAX(ts)'] / 1e9, + ) + available_events = set( + row.name + for row in tp.query("SELECT DISTINCT name FROM raw") + ) + + self._metadata = { + 'available-events': available_events, + 'time-range': time_range, + } + + self._tp = tp + super().__init__(events=events, **kwargs) + + @classmethod + def _download_trace_processor(cls, url): + def populate(key, path): + url, = key + dst = path / 'trace_processor' + cls.get_logger().info(f"Downloading Perfetto's trace_processor at {dst}") + urllib.request.urlretrieve(url, dst) + os.chmod(dst, dst.stat().st_mode | stat.S_IEXEC) + + dir_cache = DirCache( + category='perfetto_trace_processor', + populate=populate, + ) + + cache_path = dir_cache.get_entry([url]) + return cache_path / 'trace_processor' + + def _make_tp(self, path): + from perfetto.trace_processor import TraceProcessor, TraceProcessorConfig + + config = TraceProcessorConfig( + # Without that, perfetto will disallow querying for most events in + # the raw table + ingest_ftrace_in_raw=True, + bin_path=self._bin_path, + ) + tp = TraceProcessor(trace=path, config=config) + return tp + + def _query(self, query): + return self._tp.query(query) + + def _df_event(self, event): + # List the fields of the event so we know their types and can query them + query = f"SELECT arg_set_id FROM raw WHERE name = '{event}' LIMIT 1" + arg_set_id, = map(attrgetter('arg_set_id'), self._query(query)) + + query = f"SELECT * FROM args WHERE arg_set_id = {arg_set_id}" + arg_fields = [ + (field.key, field.value_type) + for field in self._query(query) + ] + arg_cols = [ + f"(SELECT display_value FROM args WHERE key = '{key}' AND args.arg_set_id = raw.arg_set_id)" + for (key, typ) in arg_fields + ] + common_cols = ['ts as Time', 'cpu as __cpu', 'utid as __pid', '(SELECT name from thread where thread.utid = raw.utid) as __comm'] + + extract = ', '.join(common_cols + arg_cols) + query = f"SELECT {extract} FROM raw WHERE name = '{event}'" + df = pd.DataFrame.from_records( + row.__dict__ + for row in self._query(query) + ) + + dtype_map = { + 'uint': 'uint64', + 'int': 'int64', + 'string': 'category', + } + + df = df.rename(columns={ + query: name + for (name, _), query in zip(arg_fields, arg_cols) + }) + + df = df.astype( + { + **{ + '__pid': 'uint32', + '__cpu': 'uint32', + '__comm': 'category', + 'Time': 'uint64', + }, + **{ + field: dtype_map[typ] + for (field, typ) in arg_fields + if typ in dtype_map + }, + } + ) + df['Time'] /= 1e9 + + # Attempt to avoid getting duplicated index issues. We may still end up + # with the same timestamp for different events accross 2 dataframes, + # which could create an issue if someone somewhat joins them, and it + # can happen in practice but there isn't really any good solution here. + # Fixing that would require adding a new fixedup column to the DB. + df_update_duplicates(df, col='Time', inplace=True) + df = df.set_index('Time') + + return df + + def parse_event(self, event): + try: + return self._df_event(event) + except Exception as e: + raise MissingTraceEventError( + [event], + available_events=self._metadata.get('available-events'), + ) from e + + def get_metadata(self, key): + try: + return self._metadata[key] + except KeyError: + return super().get_metadata(key=key) + + class EventParserBase: """ Base class for trace event parser. @@ -3660,6 +3819,8 @@ class Trace(Loggable, TraceBase): parser = SysTraceParser.from_html elif extension == '.txt': parser = HRTxtTraceParser.from_txt_file + elif extension == '.perfetto-trace': + parser = PerfettoTraceParser else: parser = TxtTraceParser.from_dat self._parser = parser diff --git a/setup.py b/setup.py index f747205ab4fb2aa492be884d14b0443baa834bec..8a4ee00126cac5a8bf566f7ebc5f90e76f381a3e 100755 --- a/setup.py +++ b/setup.py @@ -145,6 +145,9 @@ if __name__ == "__main__": # For pandas.to_parquet() dataframe storage "pyarrow", + # For lisa.trace.PerfettoTraceParser + "perfetto", + "ipython", "ipywidgets",