diff --git a/lisa/trace.py b/lisa/trace.py index 82ddcc239cac9cf6a24abbd04763ff17b192a238..fe8deec5090197d58e71da0596946eaba82a73cd 100644 --- a/lisa/trace.py +++ b/lisa/trace.py @@ -62,7 +62,7 @@ import polars as pl import devlib -from lisa.utils import Loggable, HideExekallID, memoized, lru_memoized, deduplicate, take, deprecate, nullcontext, measure_time, checksum, newtype, groupby, PartialInit, kwargs_forwarded_to, kwargs_dispatcher, ComposedContextManager, get_nested_key, unzip_into, order_as, delegate_getattr +from lisa.utils import Loggable, HideExekallID, memoized, lru_memoized, deduplicate, take, deprecate, nullcontext, measure_time, checksum, newtype, groupby, PartialInit, kwargs_forwarded_to, kwargs_dispatcher, ComposedContextManager, get_nested_key, unzip_into, order_as, delegate_getattr, DirCache from lisa.conf import SimpleMultiSrcConf, LevelKeyDesc, KeyDesc, TopLevelKeyDesc, Configurable from lisa.datautils import SignalDesc, df_add_delta, df_deduplicate, df_window, df_window_signals, series_convert, df_update_duplicates, _polars_duration_expr, _df_to, _polars_df_in_memory, Timestamp, _pandas_cleanup_df from lisa.version import VERSION_TOKEN @@ -5032,6 +5032,31 @@ class _Trace(Loggable, _InternalTraceBase): stack.enter_context(pl.StringCache()) trace_path = str(trace_path) if trace_path else None + self.trace_path = trace_path + + if parser is None: + if not trace_path: + raise ValueError('A trace path must be provided') + + url = urlparse(trace_path) + scheme = url.scheme + if scheme == 'lisatrace': + parser = ClientTraceParser.from_trace_url(trace_path) + elif scheme in ('file', ''): + _, extension = os.path.splitext(url.path) + + if extension == '.html': + parser = SysTraceParser.from_html + elif extension == '.txt': + parser = HRTxtTraceParser.from_txt_file + else: + parser = TraceDumpTraceParser.from_dat + self._parser = parser + + # No-op cache so that the cacheable metadata machinery does not fall + # over when querying the trace-id. + self._cache = _TraceCache() + trace_id = self._get_trace_id() if enable_swap: if trace_path and os.path.exists(trace_path): @@ -5044,7 +5069,10 @@ class _Trace(Loggable, _InternalTraceBase): try: os.makedirs(swap_dir, exist_ok=True) except OSError: - swap_dir = None + dir_cache = DirCache( + category='trace_swap', + ) + swap_dir = str(dir_cache.get_entry(trace_id)) if max_swap_size is None: trace_size = os.stat(trace_path).st_size @@ -5056,7 +5084,6 @@ class _Trace(Loggable, _InternalTraceBase): swap_dir = None max_swap_size = None - self.trace_path = trace_path self._parseable_events = {} if parser is None: @@ -5090,10 +5117,6 @@ class _Trace(Loggable, _InternalTraceBase): plots_dir = os.path.dirname(trace_path) self.plots_dir = plots_dir - # No-op cache so that the cacheable metadata machinery does not fall - # over when querying the trace-id. - self._cache = _TraceCache() - trace_id = self._get_trace_id() cache = _TraceCache.from_swap_dir( trace_path=trace_path, swap_dir=swap_dir, diff --git a/lisa/utils.py b/lisa/utils.py index f74b3fdf5f7819993d3a085708b79730e1372de4..c1f10c6e47931e63aac009d8a3513f23420562a5 100644 --- a/lisa/utils.py +++ b/lisa/utils.py @@ -3761,9 +3761,9 @@ class DirCache(Loggable): The cache is managed in a process-safe way, so that there can be no race between concurrent processes or threads. """ - def __init__(self, category, populate): + def __init__(self, category, populate=None): self._base = Path(LISA_CACHE_HOME, category) - self._populate = populate + self._populate = populate or (lambda *args, **kwargs: None) self._category = category def get_key_token(self, key): @@ -3860,13 +3860,24 @@ class DirCache(Loggable): logger.debug(f'Populating {self._category} cache at: {path}') base.mkdir(parents=True, exist_ok=True) + @contextlib.contextmanager + def temp_dir(base): + with tempfile.TemporaryDirectory(dir=base, delete=False) as path: + delete = True + def enable_cleanup(enable): + nonlocal delete + delete = enable + + try: + yield (path, enable_cleanup) + finally: + if delete: + shutil.rmtree(path) + # Create the cache entry under a temp name, so we can # atomically rename it and fix races with other # processes - with contextlib.ExitStack() as stack: - temp_path = stack.enter_context( - tempfile.TemporaryDirectory(dir=base) - ) + with temp_dir(base) as (temp_path, enable_cleanup): temp_path = Path(temp_path) populated_path = self._populate(key, temp_path) or temp_path @@ -3885,9 +3896,10 @@ class DirCache(Loggable): except OSError: log_found() else: - # Do not cleanup the temp_base, as it has been - # renamed and cleanup would fail. - stack.pop_all() + if temp_path == populated_path: + # Do not cleanup the temp_base, as it has been + # renamed and cleanup would fail. + enable_cleanup(False) else: log_found()