diff --git a/lisa/_git.py b/lisa/_git.py index 4fc0f8c84afda034eff72b392c8291c7de12a5f7..8ca29f75c1e629ae3f54e3b613a9851ebcb6897d 100644 --- a/lisa/_git.py +++ b/lisa/_git.py @@ -25,7 +25,7 @@ def git(repo, *args): Call git in the given repo with the given arguments """ - return subprocess.check_output(['git', '-C', repo, *args]).decode() + return subprocess.check_output(['git', '-C', repo, *args]).decode(errors='backslashreplace') def find_shortest_symref(repo_path, sha1): """ @@ -61,11 +61,15 @@ def get_sha1(repo, ref='HEAD'): """ return git(repo, 'rev-list', '-1', ref).strip() -def get_uncommited_patch(repo): +def get_uncommited_patch(repo, include_binary=False): """ Return the patch of non commited changes, both staged and not staged yet. + + :param include_binary: If ``True``, include the diff for binary files. + :type include_binary: bool """ - return git(repo, 'diff', 'HEAD') + text = ['--text'] if include_binary else [] + return git(repo, 'diff', *text, 'HEAD') def get_commit_message(repo, ref='HEAD', notes_ref='refs/notes/commits', format='%s'): """ diff --git a/lisa/analysis/base.py b/lisa/analysis/base.py index a45ec91255815f7eec65425d3be71af676b8eddc..7d805b3051b957cdc1fcee37eadfbf26d3b723eb 100644 --- a/lisa/analysis/base.py +++ b/lisa/analysis/base.py @@ -43,8 +43,8 @@ import panel as pn import panel.widgets -from lisa.utils import Loggable, deprecate, get_doc_url, get_short_doc, get_subclasses, guess_format, is_running_ipython, measure_time, memoized, update_wrapper_doc, _import_all_submodules -from lisa.trace import PandasDataDesc +from lisa.utils import Loggable, deprecate, get_doc_url, get_short_doc, get_subclasses, guess_format, is_running_ipython, measure_time, memoized, update_wrapper_doc, _import_all_submodules, optional_kwargs +from lisa.trace import _CacheDataDesc from lisa.notebook import _hv_fig_to_pane, _hv_link_dataframes, axis_cursor_delta, axis_link_dataframes, make_figure from lisa._generic import TypedList @@ -1100,23 +1100,49 @@ class TraceAnalysisBase(AnalysisHelpers): self.trace = trace self.ana = proxy or trace.ana + @optional_kwargs @classmethod - def cache(cls, f): + def cache(cls, f, fmt='parquet', ignored_params=None): """ Decorator to enable caching of the output of dataframe getter function in the trace cache. - This will write the dataframe to the swap as well, so processing can be + This will write the return data to the swap as well, so processing can be skipped completely when possible. + + :param fmt: Format of the data to write to the cache. This will + influence the extension of the cache file created. If ``disk-only`` + format is chosen, the data is not retained in memory and the path + to the allocated cache file is passed as first parameter to the + wrapped function. This allows manual management of the file's + content, as well having a path to a file to pass to external tools + if they can consume the data directly. + :type fmt: str + + :param ignored_params: Parameters to ignore when trying to hit the + cache. + :type ignored_params: list(str) """ sig = inspect.signature(f) + parameter_names = list(sig.parameters.keys()) ignored_kwargs = { # self - list(sig.parameters.keys())[0], + parameter_names[0], + *(ignored_params or []), } + memory_cache = fmt != 'disk-only' + + if not memory_cache: + path_param = parameter_names[1] + ignored_kwargs.add(path_param) + @functools.wraps(f) def wrapper(self, *args, **kwargs): + # Make some room for the argument we will fill later + if not memory_cache: + args = (None,) + args + # Express the arguments as kwargs-only params = sig.bind(self, *args, **kwargs) params.apply_defaults() @@ -1129,7 +1155,7 @@ class TraceAnalysisBase(AnalysisHelpers): # Include the trace window in the spec since that influences # what the analysis was seeing trace_state=trace.trace_state, - # Make a deepcopy as it is critical that the PandasDataDesc is + # Make a deepcopy as it is critical that the _CacheDataDesc is # not modified under the hood once inserted in the cache kwargs=copy.deepcopy({ k: v @@ -1137,19 +1163,39 @@ class TraceAnalysisBase(AnalysisHelpers): if k not in ignored_kwargs }), ) - pd_desc = PandasDataDesc(spec=spec) - + cache_desc = _CacheDataDesc(spec=spec, fmt=fmt) cache = trace._cache - write_swap = trace._write_swap - try: - df = cache.fetch(pd_desc) - except KeyError: + + def call_f(): + if not memory_cache: + try: + swap_path = cache._cache_desc_swap_path(cache_desc, create=True) + except Exception as e: + swap_path = None + kwargs[path_param] = swap_path + with measure_time() as measure: - df = f(**kwargs) - compute_cost = measure.exclusive_delta - cache.insert(pd_desc, df, compute_cost=compute_cost, write_swap=write_swap) + data = f(**kwargs) + + if memory_cache: + write_swap = trace._write_swap + compute_cost = measure.exclusive_delta + else: + write_swap = True + compute_cost = None + + cache.insert(cache_desc, data, compute_cost=compute_cost, write_swap=write_swap) + return data + + if memory_cache: + try: + data = cache.fetch(cache_desc) + except KeyError: + data = call_f() + else: + data = call_f() - return df + return data return wrapper diff --git a/lisa/analysis/tasks.py b/lisa/analysis/tasks.py index 11b3ddc07695f3d49dd73149687166e777578fd7..bb314a14b45f3ac50718ad02f089baf1db03103f 100644 --- a/lisa/analysis/tasks.py +++ b/lisa/analysis/tasks.py @@ -513,9 +513,13 @@ class TasksAnalysis(TraceAnalysisBase): df["state_str"] = stringify_task_state_series(df["state"]) """ def stringify_state(state): - try: - return TaskState(state).char - except ValueError: + # Same logic as in sched_switch format string + if state & 0xff: + try: + return TaskState(state).char + except ValueError: + return TaskState.sched_switch_str(state) + else: return TaskState.sched_switch_str(state) return series.apply(stringify_state) diff --git a/lisa/datautils.py b/lisa/datautils.py index a4011e256f90d6d17ede12781ab348477156d1c6..6c95b2c9debbd736b51dafd7c3103d72578388d1 100644 --- a/lisa/datautils.py +++ b/lisa/datautils.py @@ -1302,15 +1302,12 @@ def df_deduplicate(df, keep, consecutives, cols=None, all_col=True): @DataFrameAccessor.register_accessor -def df_update_duplicates(df, col=None, func=None, inplace=False): +def series_update_duplicates(series, func=None): """ - Update a given column to avoid duplicated values. + Update a given series to avoid duplicated values. - :param df: Dataframe to act on. - :type df: pandas.DataFrame - - :param col: Column to update. If ``None``, the index is used. - :type col: str or None + :param series: Series to act on. + :type series: pandas.Series :param func: The function used to update the column. It must take a :class:`pandas.Series` of duplicated entries to update as parameters, @@ -1318,12 +1315,8 @@ def df_update_duplicates(df, col=None, func=None, inplace=False): long as there are remaining duplicates. If ``None``, the column is assumed to be floating point and duplicated values will be incremented by the smallest amount possible. - :type func: collections.abc.Callable - - :param inplace: If ``True``, the passed dataframe will be modified. - :type inplace: bool + :type func: collections.abc.Callable or None """ - def increment(series): return np.nextafter(series, math.inf) @@ -1332,9 +1325,6 @@ def df_update_duplicates(df, col=None, func=None, inplace=False): locs = series.duplicated(keep='first') return locs, series.loc[locs] - use_index = col is None - - series = df.index.to_series() if use_index else df[col].copy() func = func if func else increment # Update the values until there is no more duplication @@ -1346,6 +1336,31 @@ def df_update_duplicates(df, col=None, func=None, inplace=False): series.loc[duplicated_locs] = updated duplicated_locs, duplicated = get_duplicated(series) + return series + + +@DataFrameAccessor.register_accessor +def df_update_duplicates(df, col=None, func=None, inplace=False): + """ + Same as :func:`series_update_duplicates` but on a :class:`pandas.DataFrame`. + + :param df: Dataframe to act on. + :type df: pandas.DataFrame + + :param col: Column to update. If ``None``, the index is used. + :type col: str or None + + :param func: See :func:`series_update_duplicates`. + :type func: collections.abc.Callable or None + + :param inplace: If ``True``, the passed dataframe will be modified. + :type inplace: bool + """ + use_index = col is None + + series = df.index.to_series() if use_index else df[col].copy() + series = series_update_duplicates(series, func=func) + df = df if inplace else df.copy() if use_index: df.index = series diff --git a/lisa/trace.py b/lisa/trace.py index 3f546f2a9529624c096977605c5b5e88aeab04f9..2a154b450761f53087e22071ad71f5b3f80c63f4 100644 --- a/lisa/trace.py +++ b/lisa/trace.py @@ -1607,43 +1607,10 @@ class TxtTraceParser(TxtTraceParserBase): be reused in another context (cached on disk), and the set of events in a :class:`Trace` object can be expanded dynamically. """ - bin_ = get_bin('trace-cmd') - - if not os.path.exists(path): - raise FileNotFoundError(f'Unable to locate specified trace file: {path}') - needed_metadata = set(needed_metadata or []) events = set(events) default_event_parser_cls, event_parsers = cls._resolve_event_parsers(event_parsers, default_event_parser_cls) - - def use_raw(event): - try: - parser = event_parsers[event] - except KeyError: - # If we don't have a known parser, use the raw output by - # default, since it will be either the same as human readable, - # or unparseable without a dedicated parser. - return True - else: - return parser.raw - - raw_events = list(itertools.chain.from_iterable( - ('-r', event) if use_raw(event) else [] - for event in events - )) - - # Make sure we only ask to trace-cmd events that can exist, otherwise - # it might bail out and give nothing at all, especially with -F - kernel_events = { - event.split(':', 1)[1] - for event in subprocess.check_output( - [bin_, 'report', '-N', '-E', '--', path], - stderr=subprocess.DEVNULL, - universal_newlines=True, - ).splitlines() - if not event.startswith('version =') - } - events = [event for event in events if event in kernel_events] + event_parsers = event_parsers.values() pre_filled_metadata = {} @@ -1686,10 +1653,77 @@ class TxtTraceParser(TxtTraceParserBase): kwargs.update( events=events, needed_metadata=needed_metadata, - event_parsers=event_parsers.values(), + event_parsers=event_parsers, default_event_parser_cls=default_event_parser_cls, pre_filled_metadata=pre_filled_metadata, ) + + cmd = cls._tracecmd_report( + path=path, + events=events, + event_parsers=event_parsers, + default_event_parser_cls=default_event_parser_cls, + # We unfortunately need to parse every single line in order to + # ensure each event has a unique timestamp in the trace, as pandas + # cannot deal with duplicated timestamps. Having unique timestamps + # inside an event dataframe is not enough as dataframes of + # different events can be combined. + filter_events=False, + ) + + # A fairly large buffer reduces the interaction overhead + bufsize = 10 * 1024 * 1024 + with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=bufsize) as p: + # Consume the lines as they come straight from the stdout object to + # avoid the memory overhead of storing the whole output in one + # gigantic string + return cls(lines=p.stdout, **kwargs) + + @classmethod + def _tracecmd_report(cls, path, events, event_parsers=None, default_event_parser_cls=None, filter_events=True): + events = set(events) + bin_ = get_bin('trace-cmd') + + if not os.path.exists(path): + raise FileNotFoundError(f'Unable to locate specified trace file: {path}') + + # Make sure we only ask to trace-cmd events that can exist, otherwise + # it might bail out and give nothing at all, especially with -F + kernel_events = { + event.split(':', 1)[1] + for event in subprocess.check_output( + [bin_, 'report', '-N', '-E', '--', path], + stderr=subprocess.DEVNULL, + universal_newlines=True, + ).splitlines() + if not event.startswith('version =') + } + events = events & kernel_events + + default_event_parser_cls, event_parsers = cls._resolve_event_parsers(event_parsers, default_event_parser_cls) + def use_raw(event): + try: + parser = event_parsers[event] + except KeyError: + # If we don't have a known parser, use the raw output by + # default, since it will be either the same as human readable, + # or unparseable without a dedicated parser. + return True + else: + return parser.raw + + if filter_events: + filter_ = list(itertools.chain.from_iterable( + ('-F', event) + for event in events + )) + else: + filter_ = [] + + raw_events = list(itertools.chain.from_iterable( + ('-r', event) if use_raw(event) else [] + for event in events + )) cmd = [ bin_, 'report', @@ -1697,17 +1731,13 @@ class TxtTraceParser(TxtTraceParserBase): '-N', # Full accuracy on timestamp '-t', + # Event filter + *filter_, # All events in raw format *raw_events, '--', path ] - # A fairly large buffer reduces the interaction overhead - bufsize = 10 * 1024 * 1024 - with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=bufsize) as p: - # Consume the lines as they come straight from the stdout object to - # avoid the memory overhead of storing the whole output in one - # gigantic string - return cls(lines=p.stdout, **kwargs) + return cmd def _get_skeleton_regex(self, need_fields): regex = r'\] +(?P<__timestamp>{floating}): *(?P<__event>{identifier}):'.format(**TxtEventParser.PARSER_REGEX_TERMINALS) @@ -2327,9 +2357,9 @@ class _AvailableTraceEventsSet: return str(self._available_events) -class PandasDataDesc(Mapping): +class _CacheDataDesc(Mapping): """ - Pandas data descriptor. + Cached data descriptor. :param spec: Specification of the data as a key/value mapping. @@ -2348,9 +2378,10 @@ class PandasDataDesc(Mapping): implemented by comparing this attribute. """ - def __init__(self, spec): + def __init__(self, spec, fmt): + self.fmt = fmt self.spec = spec - self.normal_form = PandasDataDescNF.from_spec(self.spec) + self.normal_form = _CacheDataDescNF.from_spec(self.spec, fmt) def __getitem__(self, key): return self.spec[key] @@ -2361,14 +2392,6 @@ class PandasDataDesc(Mapping): def __len__(self): return len(self.spec) - @classmethod - def from_kwargs(cls, **kwargs): - """ - Build a :class:`PandasDataDesc` with the specifications as keyword - arguments. - """ - return cls(spec=kwargs) - def __repr__(self): return '{}({})'.format( self.__class__.__name__, @@ -2388,23 +2411,25 @@ class PandasDataDesc(Mapping): return hash(self.normal_form) -class PandasDataDescNF: +class _CacheDataDescNF: """ - Normal form of :class:`PandasDataDesc`. + Normal form of :class:`_CacheDataDesc`. The normal form of the descriptor allows removing any possible differences in shape of values, and is serializable to JSON. The serialization is allowed to destroy some information (type mainly), as long as it does make two descriptors wrongly equal. """ - def __init__(self, nf): + def __init__(self, nf, fmt): + assert fmt != _CacheDataSwapEntry.META_EXTENSION + self._fmt = fmt self._nf = nf # Since it's going to be inserted in dict for sure, precompute the hash # once and for all. self._hash = hash(self._nf) @classmethod - def from_spec(cls, spec): + def from_spec(cls, spec, fmt): """ Build from a spec that can include any kind of Python objects. """ @@ -2412,7 +2437,7 @@ class PandasDataDescNF: (key, cls._coerce(val)) for key, val in spec.items() )) - return cls(nf=nf) + return cls(nf=nf, fmt=fmt) @classmethod def _coerce(cls, val): @@ -2448,7 +2473,7 @@ class PandasDataDescNF: def __eq__(self, other): if isinstance(other, self.__class__): - return self._nf == other._nf + return self._fmt == other._fmt and self._nf == other._nf else: return False @@ -2456,7 +2481,10 @@ class PandasDataDescNF: return self._hash def to_json_map(self): - return dict(self._nf) + return dict( + fmt=self._fmt, + data=self._nf, + ) @classmethod def _coerce_json(cls, x): @@ -2479,47 +2507,54 @@ class PandasDataDescNF: JSON does not preserve tuples for example, so they need to be converted back. """ + fmt = mapping['fmt'] + data = dict(mapping['data']) nf = tuple(sorted( (key, cls._coerce_json(val)) - for key, val in mapping.items() + for key, val in data.items() )) - return cls(nf=nf) + return cls(nf=nf, fmt=fmt) -class PandasDataSwapEntry: + +class _CacheDataSwapEntry: """ - Entry in the pandas data swap area of :class:`Trace`. + Entry in the data swap area of :class:`Trace`. - :param pd_desc_nf: Normal form descriptor describing what the entry + :param cache_desc_nf: Normal form descriptor describing what the entry contains. - :type pd_desc_nf: PandasDataDescNF + :type cache_desc_nf: _CacheDataDescNF :param name: Name of the entry. If ``None``, a random UUID will be generated. :type name: str or None + + :param written: ``True`` if the swap entry is already written on-disk. + :type written: bool """ - META_EXTENSION = '.meta' + META_EXTENSION = 'meta' """ Extension used by the metadata file of the swap entry in the swap. """ - def __init__(self, pd_desc_nf, name=None): - self.pd_desc_nf = pd_desc_nf + def __init__(self, cache_desc_nf, name=None, written=False): + self.cache_desc_nf = cache_desc_nf self.name = name or uuid.uuid4().hex + self.written = written @property def meta_filename(self): """ Filename of the metadata file in the swap. """ - return f'{self.name}{self.META_EXTENSION}' + return f'{self.name}.{self.META_EXTENSION}' @property def data_filename(self): """ - Filename of the pandas data file in the swap. + Filename of the data file in the swap. """ - return f'{self.name}{TraceCache.DATAFRAME_SWAP_EXTENSION}' + return f'{self.name}.{self.cache_desc_nf._fmt}' def to_json_map(self): """ @@ -2528,20 +2563,20 @@ class PandasDataSwapEntry: return { 'version-token': VERSION_TOKEN, 'name': self.name, - 'desc': self.pd_desc_nf.to_json_map(), + 'desc': self.cache_desc_nf.to_json_map(), } @classmethod - def from_json_map(cls, mapping): + def from_json_map(cls, mapping, written=False): """ Create an instance with a mapping created using :meth:`to_json_map`. """ if mapping['version-token'] != VERSION_TOKEN: raise TraceCacheSwapVersionError('Version token differ') - pd_desc_nf = PandasDataDescNF.from_json_map(mapping['desc']) + cache_desc_nf = _CacheDataDescNF.from_json_map(mapping['desc']) name = mapping['name'] - return cls(pd_desc_nf=pd_desc_nf, name=name) + return cls(cache_desc_nf=cache_desc_nf, name=name, written=written) def to_path(self, path): """ @@ -2560,7 +2595,7 @@ class PandasDataSwapEntry: with open(path) as f: mapping = json.load(f) - return cls.from_json_map(mapping) + return cls.from_json_map(mapping, written=True) class TraceCacheSwapVersionError(ValueError): @@ -2599,11 +2634,12 @@ class TraceCache(Loggable): :type metadata: dict or None :param swap_content: Initial content of the swap area. - :type swap_content: dict(PandasDataDescNF, PandasDataSwapEntry) or None + :type swap_content: dict(_CacheDataDescNF, _CacheDataSwapEntry) or None The cache manages both the :class:`pandas.DataFrame` and :class:`pandas.Series` generated in memory and a swap area used to evict - them, and to reload them quickly. + them, and to reload them quickly. Some other data (typically JSON) can also + be stored in the cache by analysis method. """ INIT_SWAP_COST = 1e-7 @@ -2624,23 +2660,18 @@ class TraceCache(Loggable): Data storage format used to swap. """ - DATAFRAME_SWAP_EXTENSION = f'.{DATAFRAME_SWAP_FORMAT}' - """ - File extension of the data swap format. - """ - def __init__(self, max_mem_size=None, trace_path=None, trace_md5=None, swap_dir=None, max_swap_size=None, swap_content=None, metadata=None): self._cache = {} self._data_cost = {} self._swap_content = swap_content or {} - self._pd_desc_swap_filename = {} + self._cache_desc_swap_filename = {} self.swap_cost = self.INIT_SWAP_COST self.swap_dir = swap_dir self.max_swap_size = max_swap_size if max_swap_size is not None else math.inf self._swap_size = self._get_swap_size() self.max_mem_size = max_mem_size if max_mem_size is not None else math.inf - self._data_mem_swap_ratio = 7 + self._data_mem_swap_ratio = 1 self._metadata = metadata or {} self.trace_path = os.path.abspath(trace_path) if trace_path else trace_path @@ -2658,7 +2689,7 @@ class TraceCache(Loggable): def get_size(nr_col): df = make_df(nr_col) buffer = io.BytesIO() - self._write_data(df, buffer) + self._write_data('parquet', df, buffer) return buffer.getbuffer().nbytes size1 = get_size(1) @@ -2680,16 +2711,19 @@ class TraceCache(Loggable): .. note:: This model seems to work pretty well for parquet format. """ - file_overhead, col_overhead = self._swap_size_overhead - # DataFrame - try: - nr_columns = data.shape[1] - # Series - except IndexError: - nr_columns = 1 + if isinstance(data, (pd.DataFrame, pd.Series)): + file_overhead, col_overhead = self._swap_size_overhead + # DataFrame + try: + nr_columns = data.shape[1] + # Series + except IndexError: + nr_columns = 1 - size = size - file_overhead - nr_columns * col_overhead - return size + size = size - file_overhead - nr_columns * col_overhead + return size + else: + return self._data_mem_usage(data) @property def trace_md5(self): @@ -2790,19 +2824,19 @@ class TraceCache(Loggable): swap_entry_filenames = { filename for filename in os.listdir(swap_dir) - if filename.endswith(PandasDataSwapEntry.META_EXTENSION) + if filename.endswith(f'.{_CacheDataSwapEntry.META_EXTENSION}') } for filename in swap_entry_filenames: path = os.path.join(swap_dir, filename) try: - swap_entry = PandasDataSwapEntry.from_path(path) + swap_entry = _CacheDataSwapEntry.from_path(path) # If there is any issue with that entry, just ignore it # pylint: disable=broad-except except Exception: continue else: - yield (swap_entry.pd_desc_nf, swap_entry) + yield (swap_entry.cache_desc_nf, swap_entry) swap_content = dict(load_swap_content(swap_dir)) @@ -2860,22 +2894,46 @@ class TraceCache(Loggable): @staticmethod def _data_mem_usage(data): - mem = data.memory_usage() - try: - return mem.sum() - except AttributeError: - return mem + if data is None: + return 1 + elif isinstance(data, (pd.DataFrame, pd.Series)): + mem = data.memory_usage() + try: + return mem.sum() + except AttributeError: + return mem + else: + return sys.getsizeof(data) - def _should_evict_to_swap(self, pd_desc, data): + def _should_evict_to_swap(self, cache_desc, data): # If we don't have any cost info, assume it is expensive to compute - compute_cost = self._data_cost.get(pd_desc, math.inf) + compute_cost = self._data_cost.get(cache_desc, math.inf) swap_cost = self._estimate_data_swap_cost(data) return swap_cost <= compute_cost - def _swap_path_of(self, pd_desc): + def _path_of_swap_entry(self, swap_entry): + return os.path.join(self.swap_dir, swap_entry.meta_filename) + + def _cache_desc_swap_path(self, cache_desc, create=False): if self.swap_dir: - pd_desc_nf = pd_desc.normal_form - swap_entry = self._swap_content[pd_desc_nf] + cache_desc_nf = cache_desc.normal_form + + if create and not self._is_written_to_swap(cache_desc): + self.insert( + cache_desc, + data=None, + compute_cost=None, + write_swap=True, + force_write_swap=True, + # We do not write the swap_entry meta file, so that the + # user can write the data file before the swap entry is + # added. This way, another process will not be tricked into + # believing the data is available whereas in fact it's in + # the process of being populated. + write_meta=False, + ) + + swap_entry = self._swap_content[cache_desc_nf] filename = swap_entry.data_filename return os.path.join(self.swap_dir, filename) else: @@ -2886,7 +2944,10 @@ class TraceCache(Loggable): # Take out from the swap cost the time it took to write the overhead # that comes with the file format, assuming the cost is # proportional to amount of data written in the swap. - swap_cost *= unbiased_swap_size / swap_size + if swap_size: + swap_cost *= unbiased_swap_size / swap_size + else: + swap_cost = 0 new_cost = swap_cost / mem_usage @@ -2894,8 +2955,13 @@ class TraceCache(Loggable): # EWMA to keep a relatively stable cost self._update_ewma('swap_cost', new_cost, override=override) - def _is_written_to_swap(self, pd_desc): - return pd_desc.normal_form in self._swap_content + def _is_written_to_swap(self, cache_desc): + try: + swap_entry = self._swap_content[cache_desc.normal_form] + except KeyError: + return False + else: + return swap_entry.written @staticmethod def _data_to_parquet(data, path, **kwargs): @@ -2938,50 +3004,89 @@ class TraceCache(Loggable): return data @classmethod - def _write_data(cls, data, path): - if cls.DATAFRAME_SWAP_FORMAT == 'parquet': + def _write_data(cls, fmt, data, path): + if fmt == 'disk-only': + return + elif fmt == 'parquet': # Snappy compression seems very fast cls._data_to_parquet(data, path, compression='snappy') + elif fmt == 'json': + with open(path, 'wt') as f: + try: + json.dump(data, f, separators=(',', ':')) + except Exception as e: + raise ValueError(f'Does not know how to write data type {data.__class__} to the cache: {e}') from e + else: + raise ValueError(f'Does not know how to dump to disk format: {fmt}') + + + @classmethod + def _load_data(cls, fmt, path): + if fmt == 'disk-only': + data = None + elif fmt == 'parquet': + data = cls._data_from_parquet(path) + elif fmt == 'json': + with open(path, 'rt') as f: + data = json.load(f) else: - raise ValueError(f'Dataframe swap format "{cls.DATAFRAME_SWAP_FORMAT}" not handled') + raise ValueError(f'File format not supported "{fmt}" at path: {path}') - def _write_swap(self, pd_desc, data): + return data + + def _write_swap(self, cache_desc, data, write_meta=True): if not self.swap_dir: return else: - if self._is_written_to_swap(pd_desc): + # TODO: this is broken for disk-only format, as we have the swap + # entry in _swap_content[] in order to match it again but the meta + # file has not been written to the disk yet. + if self._is_written_to_swap(cache_desc): return - pd_desc_nf = pd_desc.normal_form - swap_entry = PandasDataSwapEntry(pd_desc_nf) + cache_desc_nf = cache_desc.normal_form + # We may already have a swap entry if we used the None data + # placeholder. This would have allowed the user to reserve the swap + # data file in advance so they can write to it directly, instead of + # managing the data in the memory cache. + try: + swap_entry = self._swap_content[cache_desc_nf] + except KeyError: + swap_entry = _CacheDataSwapEntry(cache_desc_nf) - df_path = os.path.join(self.swap_dir, swap_entry.data_filename) + data_path = os.path.join(self.swap_dir, swap_entry.data_filename) # If that would make the swap dir too large, try to do some cleanup if self._estimate_data_swap_size(data) + self._swap_size > self.max_swap_size: self.scrub_swap() def log_error(e): - self.logger.error(f'Could not write {pd_desc} to swap: {e}') + self.logger.error(f'Could not write {cache_desc} to swap: {e}') # Write the Parquet file and update the write speed try: with measure_time() as measure: - self._write_data(data, df_path) + self._write_data(cache_desc.fmt, data, data_path) # PyArrow fails to save dataframes containing integers > 64bits except OverflowError as e: log_error(e) else: - # Update the swap - swap_entry_path = os.path.join(self.swap_dir, swap_entry.meta_filename) - swap_entry.to_path(swap_entry_path) - self._swap_content[swap_entry.pd_desc_nf] = swap_entry + # Update the swap entry on disk + if write_meta: + swap_entry.to_path( + self._path_of_swap_entry(swap_entry) + ) + swap_entry.written = True + self._swap_content[swap_entry.cache_desc_nf] = swap_entry # Assume that reading from the swap will take as much time as # writing to it. We cannot do better anyway, but that should # mostly bias to keeping things in memory if possible. swap_cost = measure.exclusive_delta - data_swapped_size = os.stat(df_path).st_size + try: + data_swapped_size = os.stat(data_path).st_size + except FileNotFoundError: + data_swapped_size = 0 mem_usage = self._data_mem_usage(data) if mem_usage: @@ -2997,7 +3102,7 @@ class TraceCache(Loggable): for dir_entry in os.scandir(self.swap_dir) ) else: - return 0 + return 1 def scrub_swap(self): """ @@ -3025,9 +3130,12 @@ class TraceCache(Loggable): non_stale_files = data_files.keys() | metadata_files stale_files = stats.keys() - non_stale_files for filename in stale_files: - del stats[filename] + stats.pop(filename, None) path = os.path.join(self.swap_dir, filename) - os.unlink(path) + try: + os.unlink(path) + except Exception: + pass def by_mtime(path_stat): _, stat = path_stat @@ -3049,64 +3157,60 @@ class TraceCache(Loggable): # Update the swap content for swap_entry in discarded_swap_entries: - del self._swap_content[swap_entry.pd_desc_nf] - del stats[swap_entry.data_filename] + del self._swap_content[swap_entry.cache_desc_nf] + stats.pop(swap_entry.data_filename, None) for filename in (swap_entry.meta_filename, swap_entry.data_filename): path = os.path.join(self.swap_dir, filename) - os.unlink(path) + try: + os.unlink(path) + except Exception: + pass self._swap_size = sum( stats[swap_entry.data_filename].st_size for swap_entry in self._swap_content.values() + if swap_entry.data_filename in stats ) - def fetch(self, pd_desc, insert=True): + def fetch(self, cache_desc, insert=True): """ Fetch an entry from the cache or the swap. - :param pd_desc: Descriptor to look for. - :type pd_desc: PandasDataDesc + :param cache_desc: Descriptor to look for. + :type cache_desc: _CacheDataDesc :param insert: If ``True`` and if the fetch succeeds by loading the swap, the data is inserted in the cache. :type insert: bool """ try: - return self._cache[pd_desc] + return self._cache[cache_desc] except KeyError as e: # pylint: disable=raise-missing-from try: - path = self._swap_path_of(pd_desc) + path = self._cache_desc_swap_path(cache_desc) # If there is no swap, bail out except (ValueError, KeyError): - raise e + raise KeyError(f'Could not find swap entry for: {cache_desc}') else: - # Try to load the dataframe from that path - try: - if self.DATAFRAME_SWAP_FORMAT == 'parquet': - data = self._data_from_parquet(path) - else: - raise ValueError(f'Dataframe swap format "{self.DATAFRAME_SWAP_FORMAT}" not handled') - except (OSError, pyarrow.lib.ArrowIOError): - raise e - else: - if insert: - # We have no idea of the cost of something coming from - # the cache - self.insert(pd_desc, data, write_swap=False, compute_cost=None) + data = self._load_data(cache_desc.fmt, path) + if insert: + # We have no idea of the cost of something coming from + # the cache + self.insert(cache_desc, data, write_swap=False, compute_cost=None) - return data + return data - def insert(self, pd_desc, data, compute_cost=None, write_swap=False, force_write_swap=False): + def insert(self, cache_desc, data, compute_cost=None, write_swap=False, force_write_swap=False, write_meta=True): """ Insert an entry in the cache. - :param pd_desc: Descriptor of the data to insert. - :type pd_desc: PandasDataDesc + :param cache_desc: Descriptor of the data to insert. + :type cache_desc: _CacheDataDesc - :param data: Pandas data to insert. - :type data: pandas.DataFrame or pandas.Series + :param data: data to insert. + :type data: object :param compute_cost: Time spent to compute the data in seconds. :type compute_cost: float or None @@ -3120,13 +3224,21 @@ class TraceCache(Loggable): :param force_write_swap: If ``True``, bypass the computation vs swap cost comparison. :type force_write_swap: bool + + :param write_meta: If ``True``, the swap entry metadata will be written + on disk if the data are. Otherwise, no swap entry is written to disk. + :type write_meta: bool """ - self._cache[pd_desc] = data + self._cache[cache_desc] = data if compute_cost is not None: - self._data_cost[pd_desc] = compute_cost + self._data_cost[cache_desc] = compute_cost if write_swap: - self.write_swap(pd_desc, force=force_write_swap) + self.write_swap( + cache_desc, + force=force_write_swap, + write_meta=write_meta + ) self._scrub_mem() @@ -3145,17 +3257,17 @@ class TraceCache(Loggable): # accurate refcount possible gc.collect() refcounts = { - pd_desc: sys.getrefcount(data) - for pd_desc, data in self._cache.items() + cache_desc: sys.getrefcount(data) + for cache_desc, data in self._cache.items() } min_refcount = min(refcounts.values()) # Low retention score means it's more likely to be evicted - def retention_score(pd_desc_and_data): - pd_desc, data = pd_desc_and_data + def retention_score(cache_desc_and_data): + cache_desc, data = cache_desc_and_data # If we don't know the computation cost, assume it can be evicted cheaply - compute_cost = self._data_cost.get(pd_desc, 0) + compute_cost = self._data_cost.get(cache_desc, 0) if not compute_cost: score = 0 @@ -3163,7 +3275,7 @@ class TraceCache(Loggable): swap_cost = self._estimate_data_swap_cost(data) # If it's already written back, make it cheaper to evict since # the eviction itself is going to be cheap - if self._is_written_to_swap(pd_desc): + if self._is_written_to_swap(cache_desc): swap_cost /= 2 if swap_cost: @@ -3178,57 +3290,61 @@ class TraceCache(Loggable): # # Normalize to the minimum refcount, so that the _cache and other # structures where references are stored are discounted for sure. - return (refcounts[pd_desc] - min_refcount + 1) * score + return (refcounts[cache_desc] - min_refcount + 1) * score new_mem_usage = 0 - for pd_desc, data in sorted(self._cache.items(), key=retention_score): + for cache_desc, data in sorted(self._cache.items(), key=retention_score): new_mem_usage += self._data_mem_usage(data) if new_mem_usage > self.max_mem_size: - self.evict(pd_desc) + self.evict(cache_desc) - def evict(self, pd_desc): + def evict(self, cache_desc): """ Evict the given descriptor from memory. - :param pd_desc: Descriptor to evict. - :type pd_desc: PandasDataDesc + :param cache_desc: Descriptor to evict. + :type cache_desc: _CacheDataDesc If it would be cheaper to reload the data than to recompute them, they will be written to the swap area. """ - self.write_swap(pd_desc) + self.write_swap(cache_desc) try: - del self._cache[pd_desc] + del self._cache[cache_desc] except KeyError: pass - def write_swap(self, pd_desc, force=False): + def write_swap(self, cache_desc, force=False, write_meta=True): """ Write the given descriptor to the swap area if that would be faster to reload the data rather than recomputing it. If the descriptor is not in the cache or if there is no swap area, ignore it. - :param pd_desc: Descriptor of the data to write to swap. - :type pd_desc: PandasDataDesc + :param cache_desc: Descriptor of the data to write to swap. + :type cache_desc: _CacheDataDesc :param force: If ``True``, bypass the compute vs swap cost comparison. :type force: bool + + :param write_meta: If ``True``, the swap entry metadata will be written + on disk if the data are. Otherwise, no swap entry is written to disk. + :type write_meta: bool """ try: - data = self._cache[pd_desc] + data = self._cache[cache_desc] except KeyError: pass else: - if force or self._should_evict_to_swap(pd_desc, data): - self._write_swap(pd_desc, data) + if force or self._should_evict_to_swap(cache_desc, data): + self._write_swap(cache_desc, data, write_meta) def write_swap_all(self): """ Attempt to write all cached data to the swap. """ - for pd_desc in self._cache.keys(): - self.write_swap(pd_desc) + for cache_desc in self._cache.keys(): + self.write_swap(cache_desc) def clear_event(self, event, raw=None): """ @@ -3243,13 +3359,13 @@ class TraceCache(Loggable): :type raw: bool or None """ self._cache = { - pd_desc: data - for pd_desc, data in self._cache.items() + cache_desc: data + for cache_desc, data in self._cache.items() if not ( - pd_desc.get('event') == event + cache_desc.get('event') == event and ( raw is None - or pd_desc.get('raw') == raw + or cache_desc.get('raw') == raw ) ) } @@ -3259,14 +3375,14 @@ class TraceCache(Loggable): Same as :meth:`clear_event` but works on all events at once. """ self._cache = { - pd_desc: data - for pd_desc, data in self._cache.items() + cache_desc: data + for cache_desc, data in self._cache.items() if ( # Cache entries can be associated to something else than events - 'event' not in pd_desc or + 'event' not in cache_desc or # Either we care about raw and we check, or blanket clear raw is None or - pd_desc.get('raw') == raw + cache_desc.get('raw') == raw ) } @@ -3513,7 +3629,10 @@ class Trace(Loggable, TraceBase): if max_swap_size is None: trace_size = os.stat(trace_path).st_size - max_swap_size = trace_size + # Use 10 times the size of the trace so that there is + # enough room to store large artifacts like a JSON dump of + # the trace + max_swap_size = trace_size * 10 else: swap_dir = None max_swap_size = None @@ -4012,7 +4131,7 @@ class Trace(Loggable, TraceBase): if raw: # Make sure all raw descriptors are made the same way, to avoid # missed sharing opportunities - spec = self._make_raw_pd_desc_spec(event) + spec = self._make_raw_cache_desc_spec(event) else: spec = dict( event=event, @@ -4039,12 +4158,12 @@ class Trace(Loggable, TraceBase): sanitization=sanitization_f.__qualname__ if sanitization_f else None, ) - pd_desc = PandasDataDesc(spec=spec) + cache_desc = _CacheDataDesc(spec=spec, fmt=TraceCache.DATAFRAME_SWAP_FORMAT) try: - df = self._cache.fetch(pd_desc, insert=True) + df = self._cache.fetch(cache_desc, insert=True) except KeyError: - df = self._load_df(pd_desc, sanitization_f=sanitization_f, write_swap=write_swap) + df = self._load_df(cache_desc, sanitization_f=sanitization_f, write_swap=write_swap) if df.empty: raise MissingTraceEventError( @@ -4058,19 +4177,19 @@ class Trace(Loggable, TraceBase): df.attrs['name'] = event return df - def _make_raw_pd_desc(self, event): - spec = self._make_raw_pd_desc_spec(event) - return PandasDataDesc(spec=spec) + def _make_raw_cache_desc(self, event): + spec = self._make_raw_cache_desc_spec(event) + return _CacheDataDesc(spec=spec, fmt=TraceCache.DATAFRAME_SWAP_FORMAT) - def _make_raw_pd_desc_spec(self, event): + def _make_raw_cache_desc_spec(self, event): return dict( event=event, raw=True, trace_state=self.trace_state, ) - def _load_df(self, pd_desc, sanitization_f=None, write_swap=None): - event = pd_desc['event'] + def _load_df(self, cache_desc, sanitization_f=None, write_swap=None): + event = cache_desc['event'] # Do not even bother loading the event if we know it cannot be # there. This avoids some OSError in case the trace file has @@ -4086,14 +4205,14 @@ class Trace(Loggable, TraceBase): if sanitization_f: # Evict the raw dataframe once we got the sanitized version, since # we are unlikely to reuse it again - self._cache.evict(self._make_raw_pd_desc(event)) + self._cache.evict(self._make_raw_cache_desc(event)) # We can ask to sanitize various aspects of the dataframe. # Adding a new aspect can be done without modifying existing # sanitization functions, as long as the default is the # previous behavior aspects = dict( - rename_cols=pd_desc['rename_cols'], + rename_cols=cache_desc['rename_cols'], ) with measure_time() as measure: df = sanitization_f(self, event, df, aspects=aspects) @@ -4101,11 +4220,11 @@ class Trace(Loggable, TraceBase): else: sanitization_time = 0 - window = pd_desc.get('window') + window = cache_desc.get('window') if window is not None: - signals_init = pd_desc['signals_init'] - compress_signals_init = pd_desc['compress_signals_init'] - cols_list = pd_desc['signals'] + signals_init = cache_desc['signals_init'] + compress_signals_init = cache_desc['compress_signals_init'] + cols_list = cache_desc['signals'] signals = [SignalDesc(event, cols) for cols in cols_list] with measure_time() as measure: @@ -4119,7 +4238,7 @@ class Trace(Loggable, TraceBase): windowing_time = 0 compute_cost = sanitization_time + windowing_time - self._cache.insert(pd_desc, df, compute_cost=compute_cost, write_swap=write_swap) + self._cache.insert(cache_desc, df, compute_cost=compute_cost, write_swap=write_swap) return df def _load_cache_raw_df(self, event_checker, write_swap, allow_missing_events=False): @@ -4133,15 +4252,15 @@ class Trace(Loggable, TraceBase): # Get the raw dataframe from the cache if possible def try_from_cache(event): - pd_desc = self._make_raw_pd_desc(event) + cache_desc = self._make_raw_cache_desc(event) try: # The caller is responsible of inserting in the cache if # necessary - df = self._cache.fetch(pd_desc, insert=False) + df = self._cache.fetch(cache_desc, insert=False) except KeyError: return None else: - self._cache.insert(pd_desc, df, **insert_kwargs) + self._cache.insert(cache_desc, df, **insert_kwargs) return df from_cache = { @@ -4160,8 +4279,8 @@ class Trace(Loggable, TraceBase): from_trace = self._load_raw_df(events_to_load) for event, df in from_trace.items(): - pd_desc = self._make_raw_pd_desc(event) - self._cache.insert(pd_desc, df, **insert_kwargs) + cache_desc = self._make_raw_cache_desc(event) + self._cache.insert(cache_desc, df, **insert_kwargs) df_map = {**from_cache, **from_trace} try: diff --git a/lisa/utils.py b/lisa/utils.py index 25b3d5b1c24ce69a92ac8bc5a43a4ad3d7c28a2a..c96d3b95193a95eb97f672a47bd5896821433160 100644 --- a/lisa/utils.py +++ b/lisa/utils.py @@ -63,6 +63,7 @@ import tempfile import shutil import platform import subprocess +import multiprocessing import ruamel.yaml from ruamel.yaml import YAML @@ -77,6 +78,7 @@ except ImportError: import lisa from lisa.version import parse_version, format_version, VERSION_TOKEN +from lisa._unshare import _empty_main # Do not infer the value using __file__, since it will break later on when @@ -1299,8 +1301,9 @@ def group_by_value(mapping, key_sort=lambda x: x): """ Group a mapping by its values - :param mapping: Mapping to reverse - :type mapping: collections.abc.Mapping + :param mapping: Mapping to reverse. If a sequence is passed, it is assumed + to contain key/value subsequences. + :type mapping: collections.abc.Mapping or collections.abc.Sequence :param key_sort: The ``key`` parameter to a :func:`sorted` call on the mapping keys @@ -1317,6 +1320,9 @@ def group_by_value(mapping, key_sort=lambda x: x): >>> group_by_value({0: 42, 1: 43, 2: 42}) OrderedDict([(42, [0, 2]), (43, [1])]) """ + if isinstance(mapping, Mapping): + mapping = mapping.items() + if not key_sort: # Just conserve the order def key_sort(_): @@ -1324,7 +1330,7 @@ def group_by_value(mapping, key_sort=lambda x: x): return OrderedDict( (val, sorted((k for k, v in key_group), key=key_sort)) - for val, key_group in groupby(mapping.items(), key=operator.itemgetter(1)) + for val, key_group in groupby(mapping, key=operator.itemgetter(1)) ) @@ -3886,4 +3892,25 @@ class LazyMapping(Mapping): return len(self._closures) +def mp_spawn_pool(import_main=False, **kwargs): + """ + Create a context manager wrapping :class:`multiprocessing.Pool` using the + ``spawn`` method, which is safe even in multithreaded applications. + + :param import_main: If ``True``, let the spawned process import the + ``__main__`` module. This is usually not necessary when the functions + executed in the pool are small and not importing ``__main__`` saves a + *lot* of time (actually, unbounded amount of time). + :type import_main: bool + + :Variable keyword arguments: Forwarded to :meth:`multiprocessing.Pool`. + """ + ctx = multiprocessing.get_context(method='spawn') + empty_main = nullcontext if import_main else _empty_main + + with empty_main(): + pool = ctx.Pool(**kwargs) + + return pool + # vim :set tabstop=4 shiftwidth=4 textwidth=80 expandtab diff --git a/lisa/version.py b/lisa/version.py index 979d1cbb18ad1a09c62dda92eec860f2ea525980..e7252d0b1fa642065101ea36e717ca2bddff4452 100644 --- a/lisa/version.py +++ b/lisa/version.py @@ -48,7 +48,7 @@ def _compute_version_token(): try: sha1 = get_sha1(repo) - patch = get_uncommited_patch(repo) + patch = get_uncommited_patch(repo, include_binary=True) # Git is not installed, just use the regular version except (FileNotFoundError, CalledProcessError): return plain_version_token