diff --git a/lisa/analysis/base.py b/lisa/analysis/base.py index f05b5238f714bf5d290d0f50a9b86f26df9188a3..c2bfc1777962876b558c418723b9e885abce76b2 100644 --- a/lisa/analysis/base.py +++ b/lisa/analysis/base.py @@ -45,7 +45,7 @@ import pandas as pd from lisa.utils import Loggable, deprecate, get_doc_url, get_short_doc, get_subclasses, guess_format, is_running_ipython, measure_time, memoized, update_wrapper_doc, _import_all_submodules, optional_kwargs from lisa.trace import _CacheDataDesc from lisa.notebook import _hv_fig_to_pane, _hv_link_dataframes, axis_cursor_delta, axis_link_dataframes, make_figure -from lisa.datautils import _df_to +from lisa.datautils import _df_to, _pandas_cleanup_df # Ensure hv.extension() is called import lisa.notebook @@ -1183,6 +1183,10 @@ class TraceAnalysisBase(AnalysisHelpers): # they are collect()'ed in f(), they will be created using a common # StringCache so Categorical columns can be concatenated and such. with pl.StringCache(): + + # We might get different types based on whether the content + # comes from the function directly (could be a pandas object) + # or from the cache (polars LazyFrame). df = cached_f(self, *args, **kwargs) assert isinstance(df, (pd.DataFrame, pl.DataFrame, pl.LazyFrame)) @@ -1276,12 +1280,19 @@ class TraceAnalysisBase(AnalysisHelpers): with measure_time() as measure: data = f(*params.args, **params.kwargs) + if isinstance(data, pd.DataFrame): + data = _pandas_cleanup_df(data) + if memory_cache: - compute_cost = measure.exclusive_delta + # Do not use measure.exclusive_delta, otherwise a simple + # function making thousands of quick calls to a child + # function may appear with a low cost, even though it + # actually has a high total cost. + compute_cost = measure.delta else: compute_cost = None - cache.insert(cache_desc, data, compute_cost=compute_cost, write_swap=True) + cache.insert(cache_desc, data, compute_cost=compute_cost, write_swap='best-effort') return data if memory_cache: diff --git a/lisa/analysis/cpus.py b/lisa/analysis/cpus.py index a6bbeecc4d3403033d01a25baf0f9dcf97a640b0..cac17e793a6d8d1a5d1b6992a690e8838b9ebf0b 100644 --- a/lisa/analysis/cpus.py +++ b/lisa/analysis/cpus.py @@ -22,7 +22,7 @@ import holoviews as hv from lisa.analysis.base import TraceAnalysisBase from lisa.trace import requires_events, CPU -from lisa.datautils import df_window, NO_INDEX +from lisa.datautils import df_window class CpusAnalysis(TraceAnalysisBase): @@ -36,7 +36,7 @@ class CpusAnalysis(TraceAnalysisBase): # DataFrame Getter Methods ############################################################################### - @TraceAnalysisBase.df_method(index=NO_INDEX) + @TraceAnalysisBase.df_method @requires_events('sched_switch') def df_context_switches(self): """ diff --git a/lisa/analysis/frequency.py b/lisa/analysis/frequency.py index 42fbd88e306ee7e09a2ad357ef9984f2758020e5..ff58b0adf8879177b1a0e1bbe6c4adbd2c4ff1cf 100644 --- a/lisa/analysis/frequency.py +++ b/lisa/analysis/frequency.py @@ -28,7 +28,7 @@ import holoviews as hv from lisa.analysis.base import TraceAnalysisBase from lisa.trace import requires_events, requires_one_event_of, CPU, MissingTraceEventError -from lisa.datautils import series_integrate, df_refit_index, series_refit_index, series_deduplicate, df_add_delta, series_mean, df_window, df_merge, SignalDesc, NO_INDEX +from lisa.datautils import series_integrate, df_refit_index, series_refit_index, series_deduplicate, df_add_delta, series_mean, df_window, df_merge, SignalDesc from lisa.notebook import plot_signal, _hv_neutral @@ -238,7 +238,7 @@ class FrequencyAnalysis(TraceAnalysisBase): return time_df - @TraceAnalysisBase.df_method(index=NO_INDEX) + @TraceAnalysisBase.df_method @_get_frequency_residency.used_events def df_cpu_frequency_residency(self, cpu): """ @@ -258,7 +258,7 @@ class FrequencyAnalysis(TraceAnalysisBase): else: raise TypeError('Input CPU parameter must be an integer') - @TraceAnalysisBase.df_method(index=NO_INDEX) + @TraceAnalysisBase.df_method @_get_frequency_residency.used_events def df_domain_frequency_residency(self, cpu): """ @@ -285,7 +285,7 @@ class FrequencyAnalysis(TraceAnalysisBase): domain, = domains return self._get_frequency_residency(tuple(domain)) - @TraceAnalysisBase.df_method(index=NO_INDEX) + @TraceAnalysisBase.df_method @df_cpu_frequency.used_events def df_cpu_frequency_transitions(self, cpu): """ diff --git a/lisa/analysis/functions.py b/lisa/analysis/functions.py index 6e5c8f43d64de7ad8522eba54a96d546d657f7b3..cc560617dae20d6be6504e79977bc7d9223f54fe 100644 --- a/lisa/analysis/functions.py +++ b/lisa/analysis/functions.py @@ -711,7 +711,7 @@ class _CallGraphNode(Mapping): @memoized def __getitem__(self, key): if not self.valid_metrics: - return np.NaN + return np.nan delta = self.exit_time - self.entry_time diff --git a/lisa/analysis/idle.py b/lisa/analysis/idle.py index 9e6981d345c216f7f625aa466a9a45f99c3fa20f..86fb3599955a3c9a8e744bdc9bddf249f0fde8dc 100644 --- a/lisa/analysis/idle.py +++ b/lisa/analysis/idle.py @@ -23,7 +23,7 @@ import typing import pandas as pd import holoviews as hv -from lisa.datautils import df_add_delta, df_refit_index, df_split_signals, NO_INDEX +from lisa.datautils import df_add_delta, df_refit_index, df_split_signals from lisa.analysis.base import TraceAnalysisBase from lisa.trace import requires_events, CPU from lisa.analysis.base import TraceAnalysisBase @@ -158,7 +158,7 @@ class IdleAnalysis(TraceAnalysisBase): 'cpu': pd.concat(map(make_series, cpus)) }).sort_index() - @TraceAnalysisBase.df_method(index=NO_INDEX) + @TraceAnalysisBase.df_method @df_cpu_idle.used_events def df_cpu_idle_state_residency(self, cpu): """ @@ -189,7 +189,7 @@ class IdleAnalysis(TraceAnalysisBase): df.index.name = 'idle_state' return df - @TraceAnalysisBase.df_method(index=NO_INDEX) + @TraceAnalysisBase.df_method @df_cpu_idle.used_events def df_cluster_idle_state_residency(self, cluster): """ diff --git a/lisa/analysis/latency.py b/lisa/analysis/latency.py index 845b931cc38db68f97c93ea488d191a272221a1f..0f7b52910b4015a16286659cdf7ae86c220e4476 100644 --- a/lisa/analysis/latency.py +++ b/lisa/analysis/latency.py @@ -22,7 +22,7 @@ import holoviews as hv from lisa.analysis.base import TraceAnalysisBase from lisa.notebook import COLOR_CYCLE, _hv_neutral from lisa.analysis.tasks import TaskState, TasksAnalysis, TaskID -from lisa.datautils import df_refit_index, NO_INDEX +from lisa.datautils import df_refit_index from lisa.trace import MissingTraceEventError @@ -51,11 +51,11 @@ class LatencyAnalysis(TraceAnalysisBase): (pl.col('curr_state') == curr_state) & (pl.col('next_state') == next_state) ) - df = df.select(["delta", "cpu", "target_cpu"]) + df = df.select(['Time', 'delta', 'cpu', 'target_cpu']) df = df.rename({'delta': name}) return df - @TraceAnalysisBase.df_method(index=NO_INDEX) + @TraceAnalysisBase.df_method @_df_latency.used_events def df_latency_wakeup(self, task): """ @@ -77,7 +77,7 @@ class LatencyAnalysis(TraceAnalysisBase): TaskState.TASK_ACTIVE, ) - @TraceAnalysisBase.df_method(index=NO_INDEX) + @TraceAnalysisBase.df_method @_df_latency.used_events def df_latency_preemption(self, task): """ @@ -96,7 +96,7 @@ class LatencyAnalysis(TraceAnalysisBase): 'preempt_latency', TaskState.TASK_RUNNING, TaskState.TASK_ACTIVE - ).select(['preempt_latency', 'cpu']) + ).select(['Time', 'preempt_latency', 'cpu']) @TraceAnalysisBase.df_method @TasksAnalysis.df_task_states.used_events @@ -274,7 +274,7 @@ class LatencyAnalysis(TraceAnalysisBase): return series, above, below - @TraceAnalysisBase.df_method(index=NO_INDEX) + @TraceAnalysisBase.df_method() @df_latency_wakeup.used_events @df_latency_preemption.used_events def _get_latencies_df(self, task, wakeup, preempt): diff --git a/lisa/analysis/rta.py b/lisa/analysis/rta.py index 69d2aabc8d38359befbe4d8b1cfc460f18ecdd48..fc70e31a1fd45d3edb534350ee6b1f2270c13436 100644 --- a/lisa/analysis/rta.py +++ b/lisa/analysis/rta.py @@ -21,7 +21,7 @@ import holoviews as hv import numpy as np from lisa.analysis.base import AnalysisHelpers, TraceAnalysisBase -from lisa.datautils import df_filter_task_ids, df_window, df_split_signals, NO_INDEX +from lisa.datautils import df_filter_task_ids, df_window, df_split_signals from lisa.trace import requires_events, requires_one_event_of, may_use_events, MissingTraceEventError from lisa.utils import memoized, order_as from lisa.analysis.tasks import TasksAnalysis, TaskID @@ -325,7 +325,7 @@ class RTAEventsAnalysis(TraceAnalysisBase): return df - @TraceAnalysisBase.df_method(index=NO_INDEX) + @TraceAnalysisBase.df_method @df_rtapp_loop.used_events def df_phases(self, task, wlgen_profile=None): """ @@ -392,9 +392,12 @@ class RTAEventsAnalysis(TraceAnalysisBase): if durations: index, columns = zip(*durations) - return pd.DataFrame(columns, index=index) + df = pd.DataFrame(columns, index=index) + df.index.name = 'Time' else: - return pd.DataFrame() + df = pd.DataFrame() + + return df @df_phases.used_events diff --git a/lisa/analysis/tasks.py b/lisa/analysis/tasks.py index 5ecfa09f5e20afb077ed6386b5fe02232e23aef9..ca4468063ac49f58ec391ad3f5e6ba6628e24b29 100644 --- a/lisa/analysis/tasks.py +++ b/lisa/analysis/tasks.py @@ -33,7 +33,7 @@ import polars as pl from lisa.analysis.base import TraceAnalysisBase from lisa.utils import memoized, kwargs_forwarded_to, deprecate, order_as -from lisa.datautils import df_filter_task_ids, series_rolling_apply, series_refit_index, df_refit_index, df_deduplicate, df_split_signals, df_add_delta, df_window, df_update_duplicates, df_combine_duplicates, SignalDesc, NO_INDEX +from lisa.datautils import df_filter_task_ids, series_rolling_apply, series_refit_index, df_refit_index, df_deduplicate, df_split_signals, df_add_delta, df_window, df_update_duplicates, df_combine_duplicates, SignalDesc from lisa.trace import requires_events, will_use_events_from, may_use_events, CPU, MissingTraceEventError from lisa.notebook import _hv_neutral, plot_signal, _hv_twinx from lisa._typeclass import FromString @@ -928,7 +928,7 @@ class TasksAnalysis(TraceAnalysisBase): ) return df - @TraceAnalysisBase.df_method(index=NO_INDEX) + @TraceAnalysisBase.df_method @df_task_states.used_events def df_task_total_residency(self, task): """ @@ -957,7 +957,7 @@ class TasksAnalysis(TraceAnalysisBase): ) return residency_df.fillna(0).sort_index() - @TraceAnalysisBase.df_method(index=NO_INDEX) + @TraceAnalysisBase.df_method(index='task') @df_task_total_residency.used_events def df_tasks_total_residency(self, tasks=None, ascending=False, count=None): """ @@ -1004,11 +1004,16 @@ class TasksAnalysis(TraceAnalysisBase): if count is not None: res_df = res_df.head(count) + res_df.index.name = 'task' + + # Ensure column names are all strings, so it can be serialized to + # parquet + res_df.columns = [str(col) for col in res_df.columns] return res_df @TraceAnalysisBase.df_method @df_task_states.used_events - def df_task_activation(self, task, cpu=None, active_value=1, sleep_value=0, preempted_value=np.NaN): + def df_task_activation(self, task, cpu=None, active_value=1, sleep_value=0, preempted_value=np.nan): """ DataFrame of a task's active time on a given CPU @@ -1052,7 +1057,7 @@ class TasksAnalysis(TraceAnalysisBase): elif state == TaskState.TASK_RUNNING: # Return NaN regardless of preempted_value, since some below # code relies on that - return np.NaN + return np.nan else: return sleep_value diff --git a/lisa/datautils.py b/lisa/datautils.py index b541e41f984daf0624d3fbb215098dce82c04eb0..b0bd1a3c837a413f2e5d92d13552b6397c698a40 100644 --- a/lisa/datautils.py +++ b/lisa/datautils.py @@ -279,7 +279,7 @@ def _df_to_polars(df, index): df = df.lazy() df = _df_to_polars(df, index=index) elif isinstance(df, pd.DataFrame): - df = pl.from_pandas(df, include_index=True) + df = pl.from_pandas(df, include_index=index is not NO_INDEX) df = _df_to_polars(df, index=index) else: raise ValueError(f'{df.__class__} not supported') @@ -295,9 +295,10 @@ def _df_to_pandas(df, index): return df else: assert isinstance(df, pl.LazyFrame) - index = _polars_index_col(df, index) - if index == 'Time' and df.schema[index].is_temporal(): + + has_time_index = index == 'Time' and df.schema[index].is_temporal() + if has_time_index: df = df.with_columns( pl.col(index).dt.total_nanoseconds() * 1e-9 ) @@ -318,9 +319,7 @@ def _df_to_pandas(df, index): pyarrow.string(): pd.StringDtype(), } df = df.to_pandas(types_mapper=dtype_mapping.get) - if index is None: - df.reset_index(inplace=True) - else: + if index is not None: df.set_index(index, inplace=True) # Nullable dtypes are still not supported everywhere, so cast back to a @@ -337,16 +336,34 @@ def _df_to_pandas(df, index): } df = df.astype(dtypes, copy=False) - # Round trip polars -> pandas -> polars can be destructive as polars will - # store timestamps at nanosecond precision in an integer. This will wipe - # any sub-nanosecond difference in values, possibly leading to duplicate - # timestamps. - df.index = series_update_duplicates(df.index.to_series()) + if has_time_index: + # Round trip polars -> pandas -> polars can be destructive as polars + # will store timestamps at nanosecond precision in an integer. This + # will wipe any sub-nanosecond difference in values, possibly leading + # to duplicate timestamps. + df.index = series_update_duplicates(df.index.to_series()) return df def _df_to(df, fmt, index=None): + + if isinstance(df, pd.DataFrame): + df = _pandas_cleanup_df(df) + + if index is None: + index = df.index.name + # Default index in pandas, e.g. when using reset_index(). In that + # case, there is no reason to include that index in anything we we + # convert it to. + if index is None and isinstance(df.index, pd.RangeIndex): + index = NO_INDEX + elif index is NO_INDEX: + assert df.index.name is None + assert isinstance(df.index, pd.RangeIndex) + else: + assert index == df.index.name + if fmt == 'pandas': return _df_to_pandas(df, index=index) elif fmt == 'polars-lazyframe': @@ -357,6 +374,23 @@ def _df_to(df, fmt, index=None): raise ValueError(f'Unknown format {fmt}') +def _pandas_cleanup_df(df): + assert isinstance(df, pd.DataFrame) + + # Ensure we only have string column names, as it is the only type that will + # survive library conversions and serialization to parquet + assert all(isinstance(col, str) for col in df.columns) + + # We need an index name if it's not just a default RangeIndex, otherwise we + # cannot convert the dataframe to polars. + assert isinstance(df.index, pd.RangeIndex) or df.index.name is not None + + # This will not survive conversion between dataframe types + df.columns.name = None + + return df + + class DataAccessor: """ Proxy class that allows extending the :class:`pandas.DataFrame` API. @@ -1967,7 +2001,7 @@ def df_combine_duplicates(df, func, output_col, cols=None, all_col=True, prune=T try: init_df[output_col] except KeyError: - init_df[output_col] = np.NaN + init_df[output_col] = np.nan else: # Restore the index that we had to remove for apply() df.index = index diff --git a/lisa/trace.py b/lisa/trace.py index 7b227552a0c785d384a50024640969ae7fd781c8..e5df10592544547a82331723ee513abc59031b44 100644 --- a/lisa/trace.py +++ b/lisa/trace.py @@ -64,7 +64,7 @@ import devlib from lisa.utils import Loggable, HideExekallID, memoized, lru_memoized, deduplicate, take, deprecate, nullcontext, measure_time, checksum, newtype, groupby, PartialInit, kwargs_forwarded_to, kwargs_dispatcher, ComposedContextManager, get_nested_key, unzip_into, order_as, delegate_getattr from lisa.conf import SimpleMultiSrcConf, LevelKeyDesc, KeyDesc, TopLevelKeyDesc, Configurable -from lisa.datautils import SignalDesc, df_add_delta, df_deduplicate, df_window, df_window_signals, series_convert, df_update_duplicates, _polars_duration_expr, _df_to, _polars_df_in_memory, Timestamp +from lisa.datautils import SignalDesc, df_add_delta, df_deduplicate, df_window, df_window_signals, series_convert, df_update_duplicates, _polars_duration_expr, _df_to, _polars_df_in_memory, Timestamp, _pandas_cleanup_df from lisa.version import VERSION_TOKEN from lisa._typeclass import FromString from lisa._kmod import LISADynamicKmod @@ -3995,14 +3995,6 @@ class _TraceCache(Loggable): be stored in the cache by analysis method. """ - INIT_SWAP_COST = 1e-8 - """ - Somewhat arbitrary number, must be small enough so that we write at - least one dataset to the cache, which will allow us getting a better - estimation. If the value is too high from the start, we will never - write anything, and the value will never have a chance to re-adjust. - """ - TRACE_META_FILENAME = 'trace.meta' """ Name of the trace metadata file in the swap area. @@ -4020,13 +4012,11 @@ class _TraceCache(Loggable): self._data_cost = {} self._swap_content = swap_content or {} self._cache_desc_swap_filename = {} - self.swap_cost = self.INIT_SWAP_COST self._swap_dir = swap_dir self.max_swap_size = max_swap_size if max_swap_size is not None else math.inf self._swap_size = self._get_swap_size() self.max_mem_size = max_mem_size if max_mem_size is not None else math.inf - self._data_mem_swap_ratio = 1 self._metadata = metadata or {} self.trace_path = os.path.abspath(trace_path) if trace_path else trace_path @@ -4233,26 +4223,8 @@ class _TraceCache(Loggable): return cls(swap_dir=swap_dir, **kwargs) - def _estimate_data_swap_cost(self, data): - return self._estimate_data_swap_size(data) * self.swap_cost - def _estimate_data_swap_size(self, data): - return self._data_mem_usage(data) * self._data_mem_swap_ratio - - def _update_ewma(self, attr, new, alpha=0.25, override=False): - with self._lock: - old = getattr(self, attr) - if override: - updated = new - else: - updated = (1 - alpha) * old + alpha * new - - setattr(self, attr, updated) - - def _update_data_swap_size_estimation(self, data, size): - mem_usage = self._data_mem_usage(data) - if mem_usage: - self._update_ewma('_data_mem_swap_ratio', size / mem_usage) + return self._data_mem_usage(data) @staticmethod def _data_mem_usage(data): @@ -4268,11 +4240,14 @@ class _TraceCache(Loggable): return sys.getsizeof(data) def _should_evict_to_swap(self, cache_desc, data): - # If we don't have any cost info, assume it is expensive to compute with self._lock: - compute_cost = self._data_cost.get(cache_desc, math.inf) - swap_cost = self._estimate_data_swap_cost(data) - return swap_cost <= compute_cost + compute_cost = self._data_cost.get( + cache_desc, + # If we don't have any cost info, assume it is expensive to + # compute + math.inf, + ) + return compute_cost >= 100e-6 def _path_of_swap_entry(self, swap_entry): return os.path.join(self.swap_dir, swap_entry.meta_filename) @@ -4300,19 +4275,6 @@ class _TraceCache(Loggable): filename = swap_entry.data_filename return os.path.join(self.swap_dir, filename) - def _update_swap_cost(self, data, swap_cost, mem_usage, swap_size): - # Take out from the swap cost the time it took to write the overhead - # that comes with the file format, assuming the cost is - # proportional to amount of data written in the swap. - if not swap_size: - swap_cost = 0 - - new_cost = swap_cost / mem_usage - - override = self.swap_cost == self.INIT_SWAP_COST - # EWMA to keep a relatively stable cost - self._update_ewma('swap_cost', new_cost, override=override) - def _is_written_to_swap(self, cache_desc): try: with self._lock: @@ -4326,6 +4288,8 @@ class _TraceCache(Loggable): def _data_to_parquet(data, path, compression='lz4', **kwargs): kwargs['compression'] = compression if isinstance(data, pd.DataFrame): + data = _pandas_cleanup_df(data) + # Data must be convertible to bytes so we dump them as JSON attrs = json.dumps(data.attrs) table = pyarrow.Table.from_pandas(data) @@ -4420,6 +4384,21 @@ class _TraceCache(Loggable): df.clear().collect() df = _LazyFrameOnDelete.attach_file_cleanup(df, [hardlink_base]) + + parquet_meta = pyarrow.parquet.read_metadata(hardlink_path) + parquet_meta = parquet_meta.metadata + try: + pandas_meta = parquet_meta[b'pandas'] + except KeyError: + pass + else: + # Load the pandas metadata and put the index column + # first, so that _polars_index_col() detects the index + # correctly. + pandas_meta = json.loads(pandas_meta.decode('utf-8')) + index_cols = pandas_meta['index_columns'] + df = df.select(order_as(df.columns, index_cols)) + return df if fmt == 'disk-only': @@ -4450,7 +4429,7 @@ class _TraceCache(Loggable): return data - def _write_swap(self, cache_desc, data, write_meta=True): + def _write_swap(self, cache_desc, data, write_meta=True, best_effort=False): try: swap_dir = self.swap_dir except ValueError: @@ -4481,11 +4460,12 @@ class _TraceCache(Loggable): # Write the Parquet file and update the write speed try: - with measure_time() as measure: - self._write_data(cache_desc.fmt, data, data_path) - # PyArrow fails to save dataframes containing integers > 64bits - except OverflowError as e: - log_error(e) + self._write_data(cache_desc.fmt, data, data_path) + except Exception as e: + if best_effort: + log_error(e) + else: + raise e else: # Update the swap entry on disk if write_meta: @@ -4506,21 +4486,14 @@ class _TraceCache(Loggable): with self._lock: self._swap_content[swap_entry.cache_desc_nf] = swap_entry - # Assume that reading from the swap will take as much time as - # writing to it. We cannot do better anyway, but that should - # mostly bias to keeping things in memory if possible. - swap_cost = measure.exclusive_delta try: data_swapped_size = os.stat(data_path).st_size except FileNotFoundError: data_swapped_size = 0 mem_usage = self._data_mem_usage(data) - if mem_usage: - self._update_swap_cost(data, swap_cost, mem_usage, data_swapped_size) with self._lock: self._swap_size += data_swapped_size - self._update_data_swap_size_estimation(data, data_swapped_size) self.scrub_swap() def _get_swap_size(self): @@ -4678,7 +4651,7 @@ class _TraceCache(Loggable): return data - def insert(self, cache_desc, data, compute_cost=None, write_swap=False, ignore_cost=False, write_meta=True, swappable=None): + def insert(self, cache_desc, data, compute_cost=None, write_swap=True, ignore_cost=False, write_meta=True, swappable=None): """ Insert an entry in the cache. @@ -4694,8 +4667,9 @@ class _TraceCache(Loggable): :param write_swap: If ``True``, the data will be written to the swap as well so it can be quickly reloaded. Note that it will be subject to cost evaluation, so it might not result in anything actually - written. - :type write_swap: bool + written. If ``"best-effort"`` is passed, writing will be attempted + and any exception suppressed. + :type write_swap: bool or str :param ignore_cost: If ``True``, bypass the computation vs swap cost comparison. @@ -4718,11 +4692,14 @@ class _TraceCache(Loggable): if compute_cost is not None: self._data_cost[cache_desc] = compute_cost - self.write_swap( - cache_desc, - ignore_cost=ignore_cost, - write_meta=write_meta - ) + if write_swap: + best_effort = (write_swap == 'best-effort') + self.write_swap( + cache_desc, + ignore_cost=ignore_cost, + write_meta=write_meta, + best_effort=best_effort, + ) self._scrub_mem() @@ -4759,20 +4736,6 @@ class _TraceCache(Loggable): # If we don't know the computation cost, assume it can be evicted cheaply compute_cost = self._data_cost.get(cache_desc, 0) - if not compute_cost: - score = 0 - else: - swap_cost = self._estimate_data_swap_cost(data) - # If it's already written back, make it cheaper to evict since - # the eviction itself is going to be cheap - if self._is_written_to_swap(cache_desc): - swap_cost /= 2 - - if swap_cost: - score = compute_cost / swap_cost - else: - score = 0 - # Assume that more references to an object implies it will # stay around for longer. Therefore, it's less interesting to # remove it from this cache and pay the cost of reading/writing it to @@ -4780,7 +4743,7 @@ class _TraceCache(Loggable): # # Normalize to the minimum refcount, so that the _cache and other # structures where references are stored are discounted for sure. - return (refcounts[cache_desc] - min_refcount + 1) * score + return (refcounts[cache_desc] - min_refcount + 1) * compute_cost new_mem_usage = 0 for cache_desc, data in sorted(self._cache.items(), key=retention_score): @@ -4798,7 +4761,7 @@ class _TraceCache(Loggable): If it would be cheaper to reload the data than to recompute them, they will be written to the swap area. """ - self.write_swap(cache_desc) + self.write_swap(cache_desc, best_effort=True) try: with self._lock: @@ -4806,7 +4769,7 @@ class _TraceCache(Loggable): except KeyError: pass - def write_swap(self, cache_desc, ignore_cost=False, write_meta=True): + def write_swap(self, cache_desc, ignore_cost=False, write_meta=True, best_effort=False): """ Write the given descriptor to the swap area if that would be faster to reload the data rather than recomputing it. If the descriptor is not in @@ -4821,6 +4784,11 @@ class _TraceCache(Loggable): :param write_meta: If ``True``, the swap entry metadata will be written on disk if the data are. Otherwise, no swap entry is written to disk. :type write_meta: bool + + :param best_effort: If ``True``, attempt to write to the swap and + simply log an error rather than raising an exception in case of + failure. + :type best_effort: bool """ try: with self._lock: @@ -4836,7 +4804,12 @@ class _TraceCache(Loggable): self._should_evict_to_swap(cache_desc, data) ) ): - self._write_swap(cache_desc, data, write_meta) + self._write_swap( + cache_desc=cache_desc, + data=data, + write_meta=write_meta, + best_effort=best_effort, + ) def write_swap_all(self, **kwargs): """