From 0f406d31da3230a67b69f6ab6998dcca042bfd88 Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Thu, 13 Jun 2024 16:18:09 +0100 Subject: [PATCH 1/6] doc/trace_analysis.rst: General update --- doc/trace_analysis.rst | 17 +++++++++++++---- lisa/trace.py | 7 ++++++- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/doc/trace_analysis.rst b/doc/trace_analysis.rst index c7efc3fb4..0773643ac 100644 --- a/doc/trace_analysis.rst +++ b/doc/trace_analysis.rst @@ -16,10 +16,10 @@ Trace ===== Our :class:`~lisa.trace.Trace` takes an Ftrace ``trace.dat`` file as input -(Systrace ``trace.html`` are also accepted, but mileage may vary since it's an -intrinsically ambiguous format), and provides access to both the raw trace -events, as well as some new :class:`pandas.DataFrame` built from analysing and -aggregating trace events. +(other text-based formats are also accepted, but mileage may vary since they +are ambiguous), and provides access to both the raw trace events, as well as +some dataframes (:class:`polars.LazyFrame` and :class:`pandas.DataFrame`) built +from analysing and aggregating trace events. You can create one like so:: @@ -34,6 +34,14 @@ Whereas analysis dataframes can be obtained like that:: # trace.ana.. trace.ana.tasks.df_tasks_states() +Switching to :mod:`polars` can be done with:: + + trace = Trace("path/to/trace.dat") + trace = trace.get_view(df_fmt='polars-lazyframe') + +Then all the dataframe APIs will return :class:`polars.LazyFrame` instances +instead of :class:`pandas.DataFrame`. + .. seealso:: See the :class:`~lisa.trace.Trace` documentation for more details. Available analysis @@ -68,6 +76,7 @@ Trace .. autoclass:: lisa.trace.Trace :members: + :inherited-members: .. automodule:: lisa.trace :members: diff --git a/lisa/trace.py b/lisa/trace.py index e5df10592..48e982083 100644 --- a/lisa/trace.py +++ b/lisa/trace.py @@ -5932,7 +5932,12 @@ class Trace(TraceBase): * ``ana``: The analysis proxy used as an entry point to run analysis methods on the trace. See :class:`lisa.analysis._proxy.AnalysisProxy`. - :Supporting more events: + :Supporting more events in text parsers: + + .. note:: ``trace.dat`` parser can now fully infer the dataframe schema + from the binary trace.dat and does not require (nor allow) any + manual setting. + Subclasses of :class:`TraceParserBase` can usually auto-detect the event format, but there may be a number of reasons to pass a custom event parser: -- GitLab From fb6b4c1b4d206ea6d14121c409cd89aba8232166 Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Thu, 13 Jun 2024 15:25:11 +0100 Subject: [PATCH 2/6] lisa.trace: Make Trace a context manager FEATURE lisa.trace.Trace objects have to manage some cache-related resources, which can require orderly cleanup in some situations (e.g. if the caller intends on removing the trace.dat folder after it is done with the Trace). Relying on __del__ for that does not work as cyclic references to the Trace instance make __del__ execute at an unknown point in the future. Fix that by providing a context manager API to Trace, which can be used in this sort of situation. The cleanup code will still attempt to run on __del__ if it has not run yet for backward compatibility and simpler situations. --- lisa/trace.py | 117 +++++++++++++++++++++++++++++++------------- tests/test_trace.py | 9 ++++ 2 files changed, 92 insertions(+), 34 deletions(-) diff --git a/lisa/trace.py b/lisa/trace.py index 48e982083..4e3808da6 100644 --- a/lisa/trace.py +++ b/lisa/trace.py @@ -2819,6 +2819,14 @@ class _InternalTraceBase(abc.ABC): def df_all_events(self, *args, **kwargs): return self.ana.notebook.df_all_events(*args, **kwargs) + @abc.abstractmethod + def __enter__(self): + return self + + @abc.abstractmethod + def __exit__(self, *args): + pass + # User-facing class TraceBase(_InternalTraceBase): @@ -3082,6 +3090,13 @@ class _TraceViewBase(_InternalTraceBase): self.base_trace = trace super().__init__() + def __enter__(self): + self.base_trace.__enter__() + return self + + def __exit__(self, *args): + return self.base_trace.__exit__(*args) + def __getattr__(self, name): return delegate_getattr(self, 'base_trace', name) @@ -4026,6 +4041,14 @@ class _TraceCache(Loggable): # Limit to one worker, as we will likely take the self._lock anyway self._thread_executor = ThreadPoolExecutor(max_workers=1) + self.__deallocator_callbacks = [ + # Ensure we block until all workers are finished. Otherwise, e.g. + # removing the swap area might fail because an worker is still creating + # the metadata file in there. + lambda: self._thread_executor.shutdown() + ] + + @property def swap_dir(self): if (swap_dir := self._swap_dir) is None: @@ -4037,43 +4060,33 @@ class _TraceCache(Loggable): @memoized def _hardlinks_base(self): path = Path(self.swap_dir) / 'hardlinks' / str(self._unique_id) - return path.resolve() - - @property - @memoized - def _hardlinks_base_deallocator(self): - path = self._hardlinks_base - f = functools.partial(_file_cleanup, paths=[path]) - return _Deallocator( - f=f, - on_del=False, - at_exit=True, - ) + path = path.resolve() + def cleanup(): + # Only try with rmdir first, so that we don't sabbotage existing + # LazyFrame that might still be alive. + try: + os.rmdir(path) + except Exception: + pass + with self._lock: + self.__deallocator_callbacks.append(cleanup) + return path def _hardlink_path(self, base, name): path = self._hardlinks_base / base path.mkdir(parents=True, exist_ok=True) - # Make sure the deallocator has been created - self._hardlinks_base_deallocator return ( path, path / name, ) - def __del__(self): - # Only try with rmdir first, so that we don't sabbotage existing - # LazyFrame that might still be alive. - try: - path = self._hardlinks_base - os.rmdir(path) - except Exception: - pass + def __enter__(self): + return self - # Ensure we block until all workers are finished. Otherwise, e.g. - # removing the swap area might fail because an worker is still creating - # the metadata file in there. - self._thread_executor.shutdown() + def __exit__(self, *args): + for cb in self.__deallocator_callbacks: + cb() def _parser_temp_path(self): try: @@ -4925,17 +4938,19 @@ class _Trace(Loggable, _InternalTraceBase): super().__init__() self._lock = threading.RLock() + stack = contextlib.ExitStack() + self._cm_stack = stack + self.__deallocator = _Deallocator( + f=lambda: self._cm_stack.__exit__(None, None, None), + on_del=True, + at_exit=True, + ) + # Make sure that we always operate with an active StringCache when # manipulating a trace object. This prevents issues with LazyFrame # built out of a DataFrame containing Categorical data, in places where # the user does not control the creation of the DataFrame. - str_cache = pl.StringCache() - str_cache.__enter__() - self.__string_cache_deallocator = _Deallocator( - f=lambda: str_cache.__exit__(None, None, None), - on_del=True, - at_exit=True, - ) + stack.enter_context(pl.StringCache()) trace_path = str(trace_path) if trace_path else None @@ -5000,7 +5015,7 @@ class _Trace(Loggable, _InternalTraceBase): # over when querying the trace-id. self._cache = _TraceCache() trace_id = self._get_trace_id() - self._cache = _TraceCache.from_swap_dir( + cache = _TraceCache.from_swap_dir( trace_path=trace_path, swap_dir=swap_dir, max_swap_size=max_swap_size, @@ -5008,6 +5023,9 @@ class _Trace(Loggable, _InternalTraceBase): trace_id=trace_id, metadata=self._cache._metadata, ) + stack.enter_context(cache) + self._cache = cache + # Initial scrub of the swap to discard unwanted data, honoring the # max_swap_size right from the beginning self._cache.scrub_swap() @@ -5029,6 +5047,12 @@ class _Trace(Loggable, _InternalTraceBase): # the Trace is almost fully initialized self.plat_info = plat_info.add_trace_src(self) + def __enter__(self): + return self + + def __exit__(self, *args): + self.__deallocator.run() + def _preload_metadata_cache(self): def fail(): raise ValueError('Fake metadata value') @@ -5987,6 +6011,13 @@ class Trace(TraceBase): self.__view = view self._df_fmt = df_fmt or 'pandas' + def __enter__(self): + self.__view.__enter__() + return self + + def __exit__(self, *args): + return self.__view.__exit__(*args) + def __init__(self, *args, df_fmt=None, **kwargs): view = self._view_from_user_kwargs(*args, **kwargs) self._init(view, df_fmt=df_fmt) @@ -6167,9 +6198,17 @@ class Trace(TraceBase): def __getattribute__(self, attr): raise RuntimeError('The trace instance can only be used after the end of the "with" statement.') + def __enter__(self): + return self + + def __exit__(self, *args): + pass + + class _TraceProxy(TraceBase): def __init__(self, path): self.__base_trace = _TraceNotSet() + self.__path = path if path is not None: # Delete the file once we are done accessing it @@ -6182,6 +6221,16 @@ class Trace(TraceBase): def __getattr__(self, attr): return delegate_getattr(self, '_TraceProxy__base_trace', attr) + def __enter__(self): + self.__base_trace.__enter__() + return self + + def __exit__(self, *args): + try: + return self.__base_trace.__exit__(*args) + finally: + self.__deallocator.run() + @property def ana(self): return self.__base_trace.ana diff --git a/tests/test_trace.py b/tests/test_trace.py index d2adbb692..887c94eab 100644 --- a/tests/test_trace.py +++ b/tests/test_trace.py @@ -90,6 +90,11 @@ class TraceTestCase(StorageTestCase): path = os.path.join(trace_dir, 'plat_info.yml') return PlatformInfo.from_yaml_map(path) + def test_context_manager(self): + trace = self.get_trace('doc') + with trace: + trace.df_event('sched_switch') + class TestTrace(TraceTestCase): """Smoke tests for LISA's Trace class""" @@ -378,6 +383,10 @@ class TestTrace(TraceTestCase): class TestTraceView(TraceTestCase): + def get_trace(self, *args, **kwargs): + trace = super().get_trace(*args, **kwargs) + return trace.get_view() + def test_lower_slice(self): view = self.trace[81:] assert len(view.ana.status.df_overutilized()) == 2 -- GitLab From 795a3bae510a5e22c5781ee62a149581f278ec54 Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Thu, 13 Jun 2024 15:48:49 +0100 Subject: [PATCH 3/6] tests: Factor out some trace to apply to TraceView as well Move some tests up the hierarchy so they are executed on both a normal Trace and the output of Trace.get_view() as well. --- tests/test_trace.py | 122 +++++++++++++++++++++++--------------------- 1 file changed, 65 insertions(+), 57 deletions(-) diff --git a/tests/test_trace.py b/tests/test_trace.py index 887c94eab..a016a4e1e 100644 --- a/tests/test_trace.py +++ b/tests/test_trace.py @@ -50,36 +50,45 @@ class TraceTestCase(StorageTestCase): super().__init__(*args, **kwargs) self.plat_info = self._get_plat_info() + def _wrap_trace(self, trace): + return trace + @property def trace(self): - return Trace( - os.path.join(self.traces_dir, 'trace.txt'), - plat_info=self.plat_info, - events=self.events, - normalize_time=False, - parser=TxtTraceParser.from_txt_file, + return self._wrap_trace( + Trace( + os.path.join(self.traces_dir, 'trace.txt'), + plat_info=self.plat_info, + events=self.events, + normalize_time=False, + parser=TxtTraceParser.from_txt_file, + ) ) def make_trace(self, in_data, plat_info=None, events=None): """ Get a trace from an embedded string of textual trace data """ - return Trace( - None, - plat_info=self.plat_info if plat_info is None else plat_info, - events=self.events if events is None else events, - normalize_time=False, - parser=TxtTraceParser.from_string(in_data), + return self._wrap_trace( + Trace( + None, + plat_info=self.plat_info if plat_info is None else plat_info, + events=self.events if events is None else events, + normalize_time=False, + parser=TxtTraceParser.from_string(in_data), + ) ) def get_trace(self, trace_name): """ Get a trace from a separate provided trace file """ - return Trace( - Path(self.traces_dir, trace_name, 'trace.dat'), - plat_info=self._get_plat_info(trace_name), - events=self.events, + return self._wrap_trace( + Trace( + Path(self.traces_dir, trace_name, 'trace.dat'), + plat_info=self._get_plat_info(trace_name), + events=self.events, + ) ) def _get_plat_info(self, trace_name=None): @@ -95,9 +104,41 @@ class TraceTestCase(StorageTestCase): with trace: trace.df_event('sched_switch') + def test_meta_event(self): + trace = self.get_trace('doc') + df = trace.df_event('userspace@rtapp_stats') + assert 'userspace@rtapp_stats' in trace.available_events + assert len(df) == 465 + + def test_meta_event_available(self): + trace = self.get_trace('doc') + assert 'userspace@rtapp_stats' in trace.available_events + + def _test_tasks_dfs(self, trace_name): + """Helper for smoke testing _dfg methods in tasks_analysis""" + trace = self.get_trace(trace_name) + + lt_df = trace.ana.load_tracking.df_tasks_signal('util') + columns = ['comm', 'pid', 'util', 'cpu'] + for column in columns: + msg = 'Task signals parsed from {} missing {} column'.format( + trace.trace_path, column) + assert column in lt_df, msg + + # Pick an arbitrary PID to try plotting signals for. + pid = lt_df['pid'].unique()[0] + # Call plot - although we won't check the results we can just check + # that things aren't totally borken. + trace.ana.load_tracking.plot_task_signals(pid) + + def test_sched_load_signals(self): + """Test parsing sched_load_se events from EAS upstream integration""" + self._test_tasks_dfs('sched_load') + + def test_sched_load_avg_signals(self): + """Test parsing sched_load_avg_task events from EAS1.2""" + self._test_tasks_dfs('sched_load_avg') -class TestTrace(TraceTestCase): - """Smoke tests for LISA's Trace class""" def test_get_task_id(self): for name, pid in [ @@ -306,31 +347,6 @@ class TestTrace(TraceTestCase): assert df.index.tolist() == [519.022643] assert df.cpu.tolist() == [2] - def _test_tasks_dfs(self, trace_name): - """Helper for smoke testing _dfg methods in tasks_analysis""" - trace = self.get_trace(trace_name) - - lt_df = trace.ana.load_tracking.df_tasks_signal('util') - columns = ['comm', 'pid', 'util', 'cpu'] - for column in columns: - msg = 'Task signals parsed from {} missing {} column'.format( - trace.trace_path, column) - assert column in lt_df, msg - - # Pick an arbitrary PID to try plotting signals for. - pid = lt_df['pid'].unique()[0] - # Call plot - although we won't check the results we can just check - # that things aren't totally borken. - trace.ana.load_tracking.plot_task_signals(pid) - - def test_sched_load_signals(self): - """Test parsing sched_load_se events from EAS upstream integration""" - self._test_tasks_dfs('sched_load') - - def test_sched_load_avg_signals(self): - """Test parsing sched_load_avg_task events from EAS1.2""" - self._test_tasks_dfs('sched_load_avg') - def df_peripheral_clock_effective_rate(self): """ TestTrace: getPeripheralClockInfo() returns proper effective rate info. @@ -370,21 +386,14 @@ class TestTrace(TraceTestCase): # Proxy check for detecting delta computation changes assert df.delta.sum() == pytest.approx(134.568219) - def test_meta_event(self): - trace = self.get_trace('doc') - df = trace.df_event('userspace@rtapp_stats') - assert 'userspace@rtapp_stats' in trace.available_events - assert len(df) == 465 - def test_meta_event_available(self): - trace = self.get_trace('doc') - assert 'userspace@rtapp_stats' in trace.available_events +class TestTrace(TraceTestCase): + """Smoke tests for LISA's Trace class""" + pass class TestTraceView(TraceTestCase): - - def get_trace(self, *args, **kwargs): - trace = super().get_trace(*args, **kwargs) + def _wrap_trace(self, trace): return trace.get_view() def test_lower_slice(self): @@ -414,9 +423,8 @@ class TestTraceView(TraceTestCase): class TestNestedTraceView(TestTraceView): - @property - def trace(self): - trace = super().trace + def _wrap_trace(self, trace): + trace = super()._wrap_trace(trace) return trace[trace.start:trace.end] -- GitLab From 8b78fe0fa97e254fba95e6ca27a8f0ecad0d65dd Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Thu, 13 Jun 2024 16:23:39 +0100 Subject: [PATCH 4/6] lisa.trace: Factor out _TraceProxy --- lisa/trace.py | 114 ++++++++++++++++++++++---------------------- tests/test_trace.py | 10 +++- 2 files changed, 67 insertions(+), 57 deletions(-) diff --git a/lisa/trace.py b/lisa/trace.py index 4e3808da6..de3bc42da 100644 --- a/lisa/trace.py +++ b/lisa/trace.py @@ -4877,6 +4877,63 @@ class _TraceCache(Loggable): } +class _TraceProxy(TraceBase): + class _TraceNotSet: + def __getattribute__(self, attr): + raise RuntimeError('The trace instance can only be used after the end of the "with" statement.') + + def __enter__(self): + return self + + def __exit__(self, *args): + pass + + def __init__(self, path): + self.__base_trace = self._TraceNotSet() + self.__path = path + + if path is not None: + # Delete the file once we are done accessing it + self.__deallocator = _Deallocator( + f=functools.partial(_file_cleanup, paths=[path]), + on_del=True, + at_exit=True, + ) + + def __getattr__(self, attr): + return delegate_getattr(self, '_TraceProxy__base_trace', attr) + + def _set_trace(self, trace): + self.__base_trace = trace + + def __enter__(self): + self.__base_trace.__enter__() + return self + + def __exit__(self, *args): + try: + return self.__base_trace.__exit__(*args) + finally: + self.__deallocator.run() + + @property + def ana(self): + return self.__base_trace.ana + + @property + def analysis(self): + return self.__base_trace.analysis + + def df_event(self, *args, **kwargs): + return self.__base_trace.df_event(*args, **kwargs) + + def _internal_df_event(self, *args, **kwargs): + return self.__base_trace._internal_df_event(*args, **kwargs) + + def _preload_events(self, *args, **kwargs): + return self.__base_trace._preload_events(*args, **kwargs) + + class _Trace(Loggable, _InternalTraceBase): def _select_userspace(self, source_event, meta_event, df): # pylint: disable=unused-argument,no-self-use @@ -6194,60 +6251,6 @@ class Trace(TraceBase): plat_info = target.plat_info needs_temp = filepath is None - class _TraceNotSet: - def __getattribute__(self, attr): - raise RuntimeError('The trace instance can only be used after the end of the "with" statement.') - - def __enter__(self): - return self - - def __exit__(self, *args): - pass - - - class _TraceProxy(TraceBase): - def __init__(self, path): - self.__base_trace = _TraceNotSet() - self.__path = path - - if path is not None: - # Delete the file once we are done accessing it - self.__deallocator = _Deallocator( - f=functools.partial(_file_cleanup, paths=[path]), - on_del=True, - at_exit=True, - ) - - def __getattr__(self, attr): - return delegate_getattr(self, '_TraceProxy__base_trace', attr) - - def __enter__(self): - self.__base_trace.__enter__() - return self - - def __exit__(self, *args): - try: - return self.__base_trace.__exit__(*args) - finally: - self.__deallocator.run() - - @property - def ana(self): - return self.__base_trace.ana - - @property - def analysis(self): - return self.__base_trace.analysis - - def df_event(self, *args, **kwargs): - return self.__base_trace.df_event(*args, **kwargs) - - def _internal_df_event(self, *args, **kwargs): - return self.__base_trace._internal_df_event(*args, **kwargs) - - def _preload_events(self, *args, **kwargs): - return self.__base_trace._preload_events(*args, **kwargs) - if needs_temp: @contextlib.contextmanager def cm_func(): @@ -6272,8 +6275,7 @@ class Trace(TraceBase): **kwargs ) - # pylint: disable=attribute-defined-outside-init - proxy._TraceProxy__base_trace = trace + proxy._set_trace(trace) @classmethod def get_event_sources(cls, *args, **kwargs): diff --git a/tests/test_trace.py b/tests/test_trace.py index a016a4e1e..b287e1ac8 100644 --- a/tests/test_trace.py +++ b/tests/test_trace.py @@ -28,7 +28,7 @@ import pandas as pd from devlib.target import KernelVersion -from lisa.trace import Trace, TxtTraceParser, MockTraceParser +from lisa.trace import Trace, TxtTraceParser, MockTraceParser, _TraceProxy from lisa.analysis.tasks import TaskID from lisa.datautils import df_squash from lisa.platforms.platinfo import PlatformInfo @@ -392,6 +392,14 @@ class TestTrace(TraceTestCase): pass +class TestTraceProxy(TraceTestCase): + """Smoke tests for LISA's Trace class""" + def _wrap_trace(self, trace): + proxy = _TraceProxy(None) + proxy._set_trace(trace) + return proxy + + class TestTraceView(TraceTestCase): def _wrap_trace(self, trace): return trace.get_view() -- GitLab From bdd8e46598c57cb84ecc1522ad3949d5e0554430 Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Thu, 13 Jun 2024 16:26:21 +0100 Subject: [PATCH 5/6] lisa.trace: Ensure _TraceProxy has a __deallocator attribute FIX --- lisa/trace.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/lisa/trace.py b/lisa/trace.py index de3bc42da..91aeacd32 100644 --- a/lisa/trace.py +++ b/lisa/trace.py @@ -94,6 +94,11 @@ def _dealloc_all(): atexit.register(_dealloc_all) def _file_cleanup(paths): + paths = [ + path + for path in paths + if path is not None + ] for path in paths: try: shutil.rmtree(path) @@ -4891,14 +4896,12 @@ class _TraceProxy(TraceBase): def __init__(self, path): self.__base_trace = self._TraceNotSet() self.__path = path - - if path is not None: + self.__deallocator = _Deallocator( # Delete the file once we are done accessing it - self.__deallocator = _Deallocator( - f=functools.partial(_file_cleanup, paths=[path]), - on_del=True, - at_exit=True, - ) + f=functools.partial(_file_cleanup, paths=[path]), + on_del=True, + at_exit=True, + ) def __getattr__(self, attr): return delegate_getattr(self, '_TraceProxy__base_trace', attr) -- GitLab From bc486b14f23205bd9b5cb48aa1aef1dd35d4863b Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Thu, 13 Jun 2024 11:37:44 +0100 Subject: [PATCH 6/6] lisa.trace: Make trace format part of trace-id Add the format of the trace to trace-id if it is available. --- lisa/trace.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lisa/trace.py b/lisa/trace.py index 91aeacd32..91956daf6 100644 --- a/lisa/trace.py +++ b/lisa/trace.py @@ -851,6 +851,11 @@ class TraceDumpTraceParser(TraceParserBase): @classmethod def _process_metadata(cls, meta): + try: + meta['trace-id'] = f'trace.dat-{meta["trace-id"]}' + except KeyError: + pass + try: start, end = meta['time-range'] except KeyError: -- GitLab