From 449d14541431058e772f010c97f26d6b66b2d2f9 Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Mon, 10 Jun 2024 12:03:27 +0100 Subject: [PATCH 1/7] lisa.datautils: Fix pandas/polars dataframe conversion index handling FIX --- lisa/datautils.py | 39 ++++++++++++++++++++++++++++----------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/lisa/datautils.py b/lisa/datautils.py index b541e41f9..f65e2ea9c 100644 --- a/lisa/datautils.py +++ b/lisa/datautils.py @@ -279,7 +279,7 @@ def _df_to_polars(df, index): df = df.lazy() df = _df_to_polars(df, index=index) elif isinstance(df, pd.DataFrame): - df = pl.from_pandas(df, include_index=True) + df = pl.from_pandas(df, include_index=index is not NO_INDEX) df = _df_to_polars(df, index=index) else: raise ValueError(f'{df.__class__} not supported') @@ -295,9 +295,10 @@ def _df_to_pandas(df, index): return df else: assert isinstance(df, pl.LazyFrame) - index = _polars_index_col(df, index) - if index == 'Time' and df.schema[index].is_temporal(): + + has_time_index = index == 'Time' and df.schema[index].is_temporal() + if has_time_index: df = df.with_columns( pl.col(index).dt.total_nanoseconds() * 1e-9 ) @@ -318,9 +319,7 @@ def _df_to_pandas(df, index): pyarrow.string(): pd.StringDtype(), } df = df.to_pandas(types_mapper=dtype_mapping.get) - if index is None: - df.reset_index(inplace=True) - else: + if index is not None: df.set_index(index, inplace=True) # Nullable dtypes are still not supported everywhere, so cast back to a @@ -337,16 +336,34 @@ def _df_to_pandas(df, index): } df = df.astype(dtypes, copy=False) - # Round trip polars -> pandas -> polars can be destructive as polars will - # store timestamps at nanosecond precision in an integer. This will wipe - # any sub-nanosecond difference in values, possibly leading to duplicate - # timestamps. - df.index = series_update_duplicates(df.index.to_series()) + if has_time_index: + # Round trip polars -> pandas -> polars can be destructive as polars + # will store timestamps at nanosecond precision in an integer. This + # will wipe any sub-nanosecond difference in values, possibly leading + # to duplicate timestamps. + df.index = series_update_duplicates(df.index.to_series()) return df def _df_to(df, fmt, index=None): + + # Ensure we only have string column names, as it is the only type that will + # survive library conversions and serialization to parquet + if isinstance(df, pd.DataFrame): + if index is None: + index = df.index.name + # Default index in pandas, e.g. when using reset_index(). In that + # case, there is no reason to include that index in anything we we + # convert it to. + if index is None and isinstance(df.index, pd.RangeIndex): + index = NO_INDEX + elif index is NO_INDEX: + assert df.index.name is None + assert isinstance(df.index, pd.RangeIndex) + else: + assert index == df.index.name + if fmt == 'pandas': return _df_to_pandas(df, index=index) elif fmt == 'polars-lazyframe': -- GitLab From ca2ce34a995551b131175890171694398ef57037 Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Mon, 10 Jun 2024 12:07:33 +0100 Subject: [PATCH 2/7] lisa.analysis: Fix NO_INDEX usage FIX Ensure the index has a name and conversion to polars are handled properly. --- lisa/analysis/base.py | 4 ++++ lisa/analysis/cpus.py | 4 ++-- lisa/analysis/frequency.py | 8 ++++---- lisa/analysis/idle.py | 6 +++--- lisa/analysis/latency.py | 12 ++++++------ lisa/analysis/rta.py | 11 +++++++---- lisa/analysis/tasks.py | 7 ++++--- 7 files changed, 30 insertions(+), 22 deletions(-) diff --git a/lisa/analysis/base.py b/lisa/analysis/base.py index f05b5238f..e3c6a9e3f 100644 --- a/lisa/analysis/base.py +++ b/lisa/analysis/base.py @@ -1183,6 +1183,10 @@ class TraceAnalysisBase(AnalysisHelpers): # they are collect()'ed in f(), they will be created using a common # StringCache so Categorical columns can be concatenated and such. with pl.StringCache(): + + # We might get different types based on whether the content + # comes from the function directly (could be a pandas object) + # or from the cache (polars LazyFrame). df = cached_f(self, *args, **kwargs) assert isinstance(df, (pd.DataFrame, pl.DataFrame, pl.LazyFrame)) diff --git a/lisa/analysis/cpus.py b/lisa/analysis/cpus.py index a6bbeecc4..cac17e793 100644 --- a/lisa/analysis/cpus.py +++ b/lisa/analysis/cpus.py @@ -22,7 +22,7 @@ import holoviews as hv from lisa.analysis.base import TraceAnalysisBase from lisa.trace import requires_events, CPU -from lisa.datautils import df_window, NO_INDEX +from lisa.datautils import df_window class CpusAnalysis(TraceAnalysisBase): @@ -36,7 +36,7 @@ class CpusAnalysis(TraceAnalysisBase): # DataFrame Getter Methods ############################################################################### - @TraceAnalysisBase.df_method(index=NO_INDEX) + @TraceAnalysisBase.df_method @requires_events('sched_switch') def df_context_switches(self): """ diff --git a/lisa/analysis/frequency.py b/lisa/analysis/frequency.py index 42fbd88e3..ff58b0adf 100644 --- a/lisa/analysis/frequency.py +++ b/lisa/analysis/frequency.py @@ -28,7 +28,7 @@ import holoviews as hv from lisa.analysis.base import TraceAnalysisBase from lisa.trace import requires_events, requires_one_event_of, CPU, MissingTraceEventError -from lisa.datautils import series_integrate, df_refit_index, series_refit_index, series_deduplicate, df_add_delta, series_mean, df_window, df_merge, SignalDesc, NO_INDEX +from lisa.datautils import series_integrate, df_refit_index, series_refit_index, series_deduplicate, df_add_delta, series_mean, df_window, df_merge, SignalDesc from lisa.notebook import plot_signal, _hv_neutral @@ -238,7 +238,7 @@ class FrequencyAnalysis(TraceAnalysisBase): return time_df - @TraceAnalysisBase.df_method(index=NO_INDEX) + @TraceAnalysisBase.df_method @_get_frequency_residency.used_events def df_cpu_frequency_residency(self, cpu): """ @@ -258,7 +258,7 @@ class FrequencyAnalysis(TraceAnalysisBase): else: raise TypeError('Input CPU parameter must be an integer') - @TraceAnalysisBase.df_method(index=NO_INDEX) + @TraceAnalysisBase.df_method @_get_frequency_residency.used_events def df_domain_frequency_residency(self, cpu): """ @@ -285,7 +285,7 @@ class FrequencyAnalysis(TraceAnalysisBase): domain, = domains return self._get_frequency_residency(tuple(domain)) - @TraceAnalysisBase.df_method(index=NO_INDEX) + @TraceAnalysisBase.df_method @df_cpu_frequency.used_events def df_cpu_frequency_transitions(self, cpu): """ diff --git a/lisa/analysis/idle.py b/lisa/analysis/idle.py index 9e6981d34..86fb35999 100644 --- a/lisa/analysis/idle.py +++ b/lisa/analysis/idle.py @@ -23,7 +23,7 @@ import typing import pandas as pd import holoviews as hv -from lisa.datautils import df_add_delta, df_refit_index, df_split_signals, NO_INDEX +from lisa.datautils import df_add_delta, df_refit_index, df_split_signals from lisa.analysis.base import TraceAnalysisBase from lisa.trace import requires_events, CPU from lisa.analysis.base import TraceAnalysisBase @@ -158,7 +158,7 @@ class IdleAnalysis(TraceAnalysisBase): 'cpu': pd.concat(map(make_series, cpus)) }).sort_index() - @TraceAnalysisBase.df_method(index=NO_INDEX) + @TraceAnalysisBase.df_method @df_cpu_idle.used_events def df_cpu_idle_state_residency(self, cpu): """ @@ -189,7 +189,7 @@ class IdleAnalysis(TraceAnalysisBase): df.index.name = 'idle_state' return df - @TraceAnalysisBase.df_method(index=NO_INDEX) + @TraceAnalysisBase.df_method @df_cpu_idle.used_events def df_cluster_idle_state_residency(self, cluster): """ diff --git a/lisa/analysis/latency.py b/lisa/analysis/latency.py index 845b931cc..0f7b52910 100644 --- a/lisa/analysis/latency.py +++ b/lisa/analysis/latency.py @@ -22,7 +22,7 @@ import holoviews as hv from lisa.analysis.base import TraceAnalysisBase from lisa.notebook import COLOR_CYCLE, _hv_neutral from lisa.analysis.tasks import TaskState, TasksAnalysis, TaskID -from lisa.datautils import df_refit_index, NO_INDEX +from lisa.datautils import df_refit_index from lisa.trace import MissingTraceEventError @@ -51,11 +51,11 @@ class LatencyAnalysis(TraceAnalysisBase): (pl.col('curr_state') == curr_state) & (pl.col('next_state') == next_state) ) - df = df.select(["delta", "cpu", "target_cpu"]) + df = df.select(['Time', 'delta', 'cpu', 'target_cpu']) df = df.rename({'delta': name}) return df - @TraceAnalysisBase.df_method(index=NO_INDEX) + @TraceAnalysisBase.df_method @_df_latency.used_events def df_latency_wakeup(self, task): """ @@ -77,7 +77,7 @@ class LatencyAnalysis(TraceAnalysisBase): TaskState.TASK_ACTIVE, ) - @TraceAnalysisBase.df_method(index=NO_INDEX) + @TraceAnalysisBase.df_method @_df_latency.used_events def df_latency_preemption(self, task): """ @@ -96,7 +96,7 @@ class LatencyAnalysis(TraceAnalysisBase): 'preempt_latency', TaskState.TASK_RUNNING, TaskState.TASK_ACTIVE - ).select(['preempt_latency', 'cpu']) + ).select(['Time', 'preempt_latency', 'cpu']) @TraceAnalysisBase.df_method @TasksAnalysis.df_task_states.used_events @@ -274,7 +274,7 @@ class LatencyAnalysis(TraceAnalysisBase): return series, above, below - @TraceAnalysisBase.df_method(index=NO_INDEX) + @TraceAnalysisBase.df_method() @df_latency_wakeup.used_events @df_latency_preemption.used_events def _get_latencies_df(self, task, wakeup, preempt): diff --git a/lisa/analysis/rta.py b/lisa/analysis/rta.py index 69d2aabc8..fc70e31a1 100644 --- a/lisa/analysis/rta.py +++ b/lisa/analysis/rta.py @@ -21,7 +21,7 @@ import holoviews as hv import numpy as np from lisa.analysis.base import AnalysisHelpers, TraceAnalysisBase -from lisa.datautils import df_filter_task_ids, df_window, df_split_signals, NO_INDEX +from lisa.datautils import df_filter_task_ids, df_window, df_split_signals from lisa.trace import requires_events, requires_one_event_of, may_use_events, MissingTraceEventError from lisa.utils import memoized, order_as from lisa.analysis.tasks import TasksAnalysis, TaskID @@ -325,7 +325,7 @@ class RTAEventsAnalysis(TraceAnalysisBase): return df - @TraceAnalysisBase.df_method(index=NO_INDEX) + @TraceAnalysisBase.df_method @df_rtapp_loop.used_events def df_phases(self, task, wlgen_profile=None): """ @@ -392,9 +392,12 @@ class RTAEventsAnalysis(TraceAnalysisBase): if durations: index, columns = zip(*durations) - return pd.DataFrame(columns, index=index) + df = pd.DataFrame(columns, index=index) + df.index.name = 'Time' else: - return pd.DataFrame() + df = pd.DataFrame() + + return df @df_phases.used_events diff --git a/lisa/analysis/tasks.py b/lisa/analysis/tasks.py index 5ecfa09f5..51c9d5d5f 100644 --- a/lisa/analysis/tasks.py +++ b/lisa/analysis/tasks.py @@ -33,7 +33,7 @@ import polars as pl from lisa.analysis.base import TraceAnalysisBase from lisa.utils import memoized, kwargs_forwarded_to, deprecate, order_as -from lisa.datautils import df_filter_task_ids, series_rolling_apply, series_refit_index, df_refit_index, df_deduplicate, df_split_signals, df_add_delta, df_window, df_update_duplicates, df_combine_duplicates, SignalDesc, NO_INDEX +from lisa.datautils import df_filter_task_ids, series_rolling_apply, series_refit_index, df_refit_index, df_deduplicate, df_split_signals, df_add_delta, df_window, df_update_duplicates, df_combine_duplicates, SignalDesc from lisa.trace import requires_events, will_use_events_from, may_use_events, CPU, MissingTraceEventError from lisa.notebook import _hv_neutral, plot_signal, _hv_twinx from lisa._typeclass import FromString @@ -928,7 +928,7 @@ class TasksAnalysis(TraceAnalysisBase): ) return df - @TraceAnalysisBase.df_method(index=NO_INDEX) + @TraceAnalysisBase.df_method @df_task_states.used_events def df_task_total_residency(self, task): """ @@ -957,7 +957,7 @@ class TasksAnalysis(TraceAnalysisBase): ) return residency_df.fillna(0).sort_index() - @TraceAnalysisBase.df_method(index=NO_INDEX) + @TraceAnalysisBase.df_method(index='task') @df_task_total_residency.used_events def df_tasks_total_residency(self, tasks=None, ascending=False, count=None): """ @@ -1004,6 +1004,7 @@ class TasksAnalysis(TraceAnalysisBase): if count is not None: res_df = res_df.head(count) + res_df.index.name = 'task' return res_df @TraceAnalysisBase.df_method -- GitLab From d2c63822fb8c4b481582e07f184041df41d3da53 Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Mon, 10 Jun 2024 12:11:26 +0100 Subject: [PATCH 3/7] lisa.datautils: Enforce string for column names in converted dataframes BREAKING CHANGE Ensure we only have string column names, as this is the only type that can: * survive serialization to parquet * be converted between different dataframe libraries --- lisa/analysis/base.py | 5 ++++- lisa/analysis/tasks.py | 4 ++++ lisa/datautils.py | 21 +++++++++++++++++++-- lisa/trace.py | 4 +++- 4 files changed, 30 insertions(+), 4 deletions(-) diff --git a/lisa/analysis/base.py b/lisa/analysis/base.py index e3c6a9e3f..f9c8eea6b 100644 --- a/lisa/analysis/base.py +++ b/lisa/analysis/base.py @@ -45,7 +45,7 @@ import pandas as pd from lisa.utils import Loggable, deprecate, get_doc_url, get_short_doc, get_subclasses, guess_format, is_running_ipython, measure_time, memoized, update_wrapper_doc, _import_all_submodules, optional_kwargs from lisa.trace import _CacheDataDesc from lisa.notebook import _hv_fig_to_pane, _hv_link_dataframes, axis_cursor_delta, axis_link_dataframes, make_figure -from lisa.datautils import _df_to +from lisa.datautils import _df_to, _pandas_cleanup_df # Ensure hv.extension() is called import lisa.notebook @@ -1280,6 +1280,9 @@ class TraceAnalysisBase(AnalysisHelpers): with measure_time() as measure: data = f(*params.args, **params.kwargs) + if isinstance(data, pd.DataFrame): + data = _pandas_cleanup_df(data) + if memory_cache: compute_cost = measure.exclusive_delta else: diff --git a/lisa/analysis/tasks.py b/lisa/analysis/tasks.py index 51c9d5d5f..3bbbbf9a4 100644 --- a/lisa/analysis/tasks.py +++ b/lisa/analysis/tasks.py @@ -1005,6 +1005,10 @@ class TasksAnalysis(TraceAnalysisBase): res_df = res_df.head(count) res_df.index.name = 'task' + + # Ensure column names are all strings, so it can be serialized to + # parquet + res_df.columns = [str(col) for col in res_df.columns] return res_df @TraceAnalysisBase.df_method diff --git a/lisa/datautils.py b/lisa/datautils.py index f65e2ea9c..f73b26897 100644 --- a/lisa/datautils.py +++ b/lisa/datautils.py @@ -348,9 +348,9 @@ def _df_to_pandas(df, index): def _df_to(df, fmt, index=None): - # Ensure we only have string column names, as it is the only type that will - # survive library conversions and serialization to parquet if isinstance(df, pd.DataFrame): + df = _pandas_cleanup_df(df) + if index is None: index = df.index.name # Default index in pandas, e.g. when using reset_index(). In that @@ -374,6 +374,23 @@ def _df_to(df, fmt, index=None): raise ValueError(f'Unknown format {fmt}') +def _pandas_cleanup_df(df): + assert isinstance(df, pd.DataFrame) + + # Ensure we only have string column names, as it is the only type that will + # survive library conversions and serialization to parquet + assert all(isinstance(col, str) for col in df.columns) + + # We need an index name if it's not just a default RangeIndex, otherwise we + # cannot convert the dataframe to polars. + assert isinstance(df.index, pd.RangeIndex) or df.index.name is not None + + # This will not survive conversion between dataframe types + df.columns.name = None + + return df + + class DataAccessor: """ Proxy class that allows extending the :class:`pandas.DataFrame` API. diff --git a/lisa/trace.py b/lisa/trace.py index 7b227552a..aa2f2ac76 100644 --- a/lisa/trace.py +++ b/lisa/trace.py @@ -64,7 +64,7 @@ import devlib from lisa.utils import Loggable, HideExekallID, memoized, lru_memoized, deduplicate, take, deprecate, nullcontext, measure_time, checksum, newtype, groupby, PartialInit, kwargs_forwarded_to, kwargs_dispatcher, ComposedContextManager, get_nested_key, unzip_into, order_as, delegate_getattr from lisa.conf import SimpleMultiSrcConf, LevelKeyDesc, KeyDesc, TopLevelKeyDesc, Configurable -from lisa.datautils import SignalDesc, df_add_delta, df_deduplicate, df_window, df_window_signals, series_convert, df_update_duplicates, _polars_duration_expr, _df_to, _polars_df_in_memory, Timestamp +from lisa.datautils import SignalDesc, df_add_delta, df_deduplicate, df_window, df_window_signals, series_convert, df_update_duplicates, _polars_duration_expr, _df_to, _polars_df_in_memory, Timestamp, _pandas_cleanup_df from lisa.version import VERSION_TOKEN from lisa._typeclass import FromString from lisa._kmod import LISADynamicKmod @@ -4326,6 +4326,8 @@ class _TraceCache(Loggable): def _data_to_parquet(data, path, compression='lz4', **kwargs): kwargs['compression'] = compression if isinstance(data, pd.DataFrame): + data = _pandas_cleanup_df(data) + # Data must be convertible to bytes so we dump them as JSON attrs = json.dumps(data.attrs) table = pyarrow.Table.from_pandas(data) -- GitLab From 32c9c34a318c04035f42f58f3571fc2bf4fab855 Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Mon, 10 Jun 2024 15:15:53 +0100 Subject: [PATCH 4/7] lisa.trace: Ensure the dataframe cache preserves the index column FIX If the parquet written to the cache was originally a pandas dataframe, reload the pandas-specific metadata to ensure the index column is the first one in the reloaded polars LazyFrame. This ensures the rest of LISA understands that this should be used as the index. --- lisa/trace.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/lisa/trace.py b/lisa/trace.py index aa2f2ac76..295bb33ba 100644 --- a/lisa/trace.py +++ b/lisa/trace.py @@ -4422,6 +4422,21 @@ class _TraceCache(Loggable): df.clear().collect() df = _LazyFrameOnDelete.attach_file_cleanup(df, [hardlink_base]) + + parquet_meta = pyarrow.parquet.read_metadata(hardlink_path) + parquet_meta = parquet_meta.metadata + try: + pandas_meta = parquet_meta[b'pandas'] + except KeyError: + pass + else: + # Load the pandas metadata and put the index column + # first, so that _polars_index_col() detects the index + # correctly. + pandas_meta = json.loads(pandas_meta.decode('utf-8')) + index_cols = pandas_meta['index_columns'] + df = df.select(order_as(df.columns, index_cols)) + return df if fmt == 'disk-only': -- GitLab From 6ca486bb81aa49d48b87072de6bc1196bac792b6 Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Mon, 10 Jun 2024 17:23:43 +0100 Subject: [PATCH 5/7] lisa.trace: Simplify the decision to write a dataframe to swap FEATURE Write a parquet file to swap as soon as the data took enough time to compute, regardless of the data size to write. This makes it more predictable. --- lisa/analysis/base.py | 6 +++- lisa/trace.py | 80 ++++++------------------------------------- 2 files changed, 15 insertions(+), 71 deletions(-) diff --git a/lisa/analysis/base.py b/lisa/analysis/base.py index f9c8eea6b..1615c97bc 100644 --- a/lisa/analysis/base.py +++ b/lisa/analysis/base.py @@ -1284,7 +1284,11 @@ class TraceAnalysisBase(AnalysisHelpers): data = _pandas_cleanup_df(data) if memory_cache: - compute_cost = measure.exclusive_delta + # Do not use measure.exclusive_delta, otherwise a simple + # function making thousands of quick calls to a child + # function may appear with a low cost, even though it + # actually has a high total cost. + compute_cost = measure.delta else: compute_cost = None diff --git a/lisa/trace.py b/lisa/trace.py index 295bb33ba..3a9e2bd23 100644 --- a/lisa/trace.py +++ b/lisa/trace.py @@ -3995,14 +3995,6 @@ class _TraceCache(Loggable): be stored in the cache by analysis method. """ - INIT_SWAP_COST = 1e-8 - """ - Somewhat arbitrary number, must be small enough so that we write at - least one dataset to the cache, which will allow us getting a better - estimation. If the value is too high from the start, we will never - write anything, and the value will never have a chance to re-adjust. - """ - TRACE_META_FILENAME = 'trace.meta' """ Name of the trace metadata file in the swap area. @@ -4020,13 +4012,11 @@ class _TraceCache(Loggable): self._data_cost = {} self._swap_content = swap_content or {} self._cache_desc_swap_filename = {} - self.swap_cost = self.INIT_SWAP_COST self._swap_dir = swap_dir self.max_swap_size = max_swap_size if max_swap_size is not None else math.inf self._swap_size = self._get_swap_size() self.max_mem_size = max_mem_size if max_mem_size is not None else math.inf - self._data_mem_swap_ratio = 1 self._metadata = metadata or {} self.trace_path = os.path.abspath(trace_path) if trace_path else trace_path @@ -4233,26 +4223,8 @@ class _TraceCache(Loggable): return cls(swap_dir=swap_dir, **kwargs) - def _estimate_data_swap_cost(self, data): - return self._estimate_data_swap_size(data) * self.swap_cost - def _estimate_data_swap_size(self, data): - return self._data_mem_usage(data) * self._data_mem_swap_ratio - - def _update_ewma(self, attr, new, alpha=0.25, override=False): - with self._lock: - old = getattr(self, attr) - if override: - updated = new - else: - updated = (1 - alpha) * old + alpha * new - - setattr(self, attr, updated) - - def _update_data_swap_size_estimation(self, data, size): - mem_usage = self._data_mem_usage(data) - if mem_usage: - self._update_ewma('_data_mem_swap_ratio', size / mem_usage) + return self._data_mem_usage(data) @staticmethod def _data_mem_usage(data): @@ -4268,11 +4240,14 @@ class _TraceCache(Loggable): return sys.getsizeof(data) def _should_evict_to_swap(self, cache_desc, data): - # If we don't have any cost info, assume it is expensive to compute with self._lock: - compute_cost = self._data_cost.get(cache_desc, math.inf) - swap_cost = self._estimate_data_swap_cost(data) - return swap_cost <= compute_cost + compute_cost = self._data_cost.get( + cache_desc, + # If we don't have any cost info, assume it is expensive to + # compute + math.inf, + ) + return compute_cost >= 100e-6 def _path_of_swap_entry(self, swap_entry): return os.path.join(self.swap_dir, swap_entry.meta_filename) @@ -4300,19 +4275,6 @@ class _TraceCache(Loggable): filename = swap_entry.data_filename return os.path.join(self.swap_dir, filename) - def _update_swap_cost(self, data, swap_cost, mem_usage, swap_size): - # Take out from the swap cost the time it took to write the overhead - # that comes with the file format, assuming the cost is - # proportional to amount of data written in the swap. - if not swap_size: - swap_cost = 0 - - new_cost = swap_cost / mem_usage - - override = self.swap_cost == self.INIT_SWAP_COST - # EWMA to keep a relatively stable cost - self._update_ewma('swap_cost', new_cost, override=override) - def _is_written_to_swap(self, cache_desc): try: with self._lock: @@ -4498,8 +4460,7 @@ class _TraceCache(Loggable): # Write the Parquet file and update the write speed try: - with measure_time() as measure: - self._write_data(cache_desc.fmt, data, data_path) + self._write_data(cache_desc.fmt, data, data_path) # PyArrow fails to save dataframes containing integers > 64bits except OverflowError as e: log_error(e) @@ -4523,21 +4484,14 @@ class _TraceCache(Loggable): with self._lock: self._swap_content[swap_entry.cache_desc_nf] = swap_entry - # Assume that reading from the swap will take as much time as - # writing to it. We cannot do better anyway, but that should - # mostly bias to keeping things in memory if possible. - swap_cost = measure.exclusive_delta try: data_swapped_size = os.stat(data_path).st_size except FileNotFoundError: data_swapped_size = 0 mem_usage = self._data_mem_usage(data) - if mem_usage: - self._update_swap_cost(data, swap_cost, mem_usage, data_swapped_size) with self._lock: self._swap_size += data_swapped_size - self._update_data_swap_size_estimation(data, data_swapped_size) self.scrub_swap() def _get_swap_size(self): @@ -4776,20 +4730,6 @@ class _TraceCache(Loggable): # If we don't know the computation cost, assume it can be evicted cheaply compute_cost = self._data_cost.get(cache_desc, 0) - if not compute_cost: - score = 0 - else: - swap_cost = self._estimate_data_swap_cost(data) - # If it's already written back, make it cheaper to evict since - # the eviction itself is going to be cheap - if self._is_written_to_swap(cache_desc): - swap_cost /= 2 - - if swap_cost: - score = compute_cost / swap_cost - else: - score = 0 - # Assume that more references to an object implies it will # stay around for longer. Therefore, it's less interesting to # remove it from this cache and pay the cost of reading/writing it to @@ -4797,7 +4737,7 @@ class _TraceCache(Loggable): # # Normalize to the minimum refcount, so that the _cache and other # structures where references are stored are discounted for sure. - return (refcounts[cache_desc] - min_refcount + 1) * score + return (refcounts[cache_desc] - min_refcount + 1) * compute_cost new_mem_usage = 0 for cache_desc, data in sorted(self._cache.items(), key=retention_score): -- GitLab From 8443345b74dfde6569d60fc3ad661efd5b45d8e0 Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Wed, 12 Jun 2024 16:16:32 +0100 Subject: [PATCH 6/7] lisa.trace: Allow best-effort swap writing FIX Allow trying to write data to the swap area and suppress any exception raised. --- lisa/analysis/base.py | 2 +- lisa/trace.py | 46 +++++++++++++++++++++++++++++-------------- 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/lisa/analysis/base.py b/lisa/analysis/base.py index 1615c97bc..c2bfc1777 100644 --- a/lisa/analysis/base.py +++ b/lisa/analysis/base.py @@ -1292,7 +1292,7 @@ class TraceAnalysisBase(AnalysisHelpers): else: compute_cost = None - cache.insert(cache_desc, data, compute_cost=compute_cost, write_swap=True) + cache.insert(cache_desc, data, compute_cost=compute_cost, write_swap='best-effort') return data if memory_cache: diff --git a/lisa/trace.py b/lisa/trace.py index 3a9e2bd23..e5df10592 100644 --- a/lisa/trace.py +++ b/lisa/trace.py @@ -4429,7 +4429,7 @@ class _TraceCache(Loggable): return data - def _write_swap(self, cache_desc, data, write_meta=True): + def _write_swap(self, cache_desc, data, write_meta=True, best_effort=False): try: swap_dir = self.swap_dir except ValueError: @@ -4461,9 +4461,11 @@ class _TraceCache(Loggable): # Write the Parquet file and update the write speed try: self._write_data(cache_desc.fmt, data, data_path) - # PyArrow fails to save dataframes containing integers > 64bits - except OverflowError as e: - log_error(e) + except Exception as e: + if best_effort: + log_error(e) + else: + raise e else: # Update the swap entry on disk if write_meta: @@ -4649,7 +4651,7 @@ class _TraceCache(Loggable): return data - def insert(self, cache_desc, data, compute_cost=None, write_swap=False, ignore_cost=False, write_meta=True, swappable=None): + def insert(self, cache_desc, data, compute_cost=None, write_swap=True, ignore_cost=False, write_meta=True, swappable=None): """ Insert an entry in the cache. @@ -4665,8 +4667,9 @@ class _TraceCache(Loggable): :param write_swap: If ``True``, the data will be written to the swap as well so it can be quickly reloaded. Note that it will be subject to cost evaluation, so it might not result in anything actually - written. - :type write_swap: bool + written. If ``"best-effort"`` is passed, writing will be attempted + and any exception suppressed. + :type write_swap: bool or str :param ignore_cost: If ``True``, bypass the computation vs swap cost comparison. @@ -4689,11 +4692,14 @@ class _TraceCache(Loggable): if compute_cost is not None: self._data_cost[cache_desc] = compute_cost - self.write_swap( - cache_desc, - ignore_cost=ignore_cost, - write_meta=write_meta - ) + if write_swap: + best_effort = (write_swap == 'best-effort') + self.write_swap( + cache_desc, + ignore_cost=ignore_cost, + write_meta=write_meta, + best_effort=best_effort, + ) self._scrub_mem() @@ -4755,7 +4761,7 @@ class _TraceCache(Loggable): If it would be cheaper to reload the data than to recompute them, they will be written to the swap area. """ - self.write_swap(cache_desc) + self.write_swap(cache_desc, best_effort=True) try: with self._lock: @@ -4763,7 +4769,7 @@ class _TraceCache(Loggable): except KeyError: pass - def write_swap(self, cache_desc, ignore_cost=False, write_meta=True): + def write_swap(self, cache_desc, ignore_cost=False, write_meta=True, best_effort=False): """ Write the given descriptor to the swap area if that would be faster to reload the data rather than recomputing it. If the descriptor is not in @@ -4778,6 +4784,11 @@ class _TraceCache(Loggable): :param write_meta: If ``True``, the swap entry metadata will be written on disk if the data are. Otherwise, no swap entry is written to disk. :type write_meta: bool + + :param best_effort: If ``True``, attempt to write to the swap and + simply log an error rather than raising an exception in case of + failure. + :type best_effort: bool """ try: with self._lock: @@ -4793,7 +4804,12 @@ class _TraceCache(Loggable): self._should_evict_to_swap(cache_desc, data) ) ): - self._write_swap(cache_desc, data, write_meta) + self._write_swap( + cache_desc=cache_desc, + data=data, + write_meta=write_meta, + best_effort=best_effort, + ) def write_swap_all(self, **kwargs): """ -- GitLab From 04615f75ab27d0801e9497cda3fa31b0fa823ac7 Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Mon, 17 Jun 2024 14:07:05 +0100 Subject: [PATCH 7/7] lisa: Replace np.NaN with np.nan FIX np.NaN was removed in numpy 2.0 in favor of np.nan --- lisa/analysis/functions.py | 2 +- lisa/analysis/tasks.py | 4 ++-- lisa/datautils.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lisa/analysis/functions.py b/lisa/analysis/functions.py index 6e5c8f43d..cc560617d 100644 --- a/lisa/analysis/functions.py +++ b/lisa/analysis/functions.py @@ -711,7 +711,7 @@ class _CallGraphNode(Mapping): @memoized def __getitem__(self, key): if not self.valid_metrics: - return np.NaN + return np.nan delta = self.exit_time - self.entry_time diff --git a/lisa/analysis/tasks.py b/lisa/analysis/tasks.py index 3bbbbf9a4..ca4468063 100644 --- a/lisa/analysis/tasks.py +++ b/lisa/analysis/tasks.py @@ -1013,7 +1013,7 @@ class TasksAnalysis(TraceAnalysisBase): @TraceAnalysisBase.df_method @df_task_states.used_events - def df_task_activation(self, task, cpu=None, active_value=1, sleep_value=0, preempted_value=np.NaN): + def df_task_activation(self, task, cpu=None, active_value=1, sleep_value=0, preempted_value=np.nan): """ DataFrame of a task's active time on a given CPU @@ -1057,7 +1057,7 @@ class TasksAnalysis(TraceAnalysisBase): elif state == TaskState.TASK_RUNNING: # Return NaN regardless of preempted_value, since some below # code relies on that - return np.NaN + return np.nan else: return sleep_value diff --git a/lisa/datautils.py b/lisa/datautils.py index f73b26897..b0bd1a3c8 100644 --- a/lisa/datautils.py +++ b/lisa/datautils.py @@ -2001,7 +2001,7 @@ def df_combine_duplicates(df, func, output_col, cols=None, all_col=True, prune=T try: init_df[output_col] except KeyError: - init_df[output_col] = np.NaN + init_df[output_col] = np.nan else: # Restore the index that we had to remove for apply() df.index = index -- GitLab