From 543766b976ec101d371f69ce12fdf573c3aaa342 Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Mon, 1 Jul 2024 17:41:58 +0100 Subject: [PATCH 1/3] Revert "setup.py: Require polars < 1.0.0" This reverts commit a1a3c38bb6e55c3dfaed39949cda86963b039eb7. --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index d3aa27d77..a68c383cd 100755 --- a/setup.py +++ b/setup.py @@ -137,7 +137,7 @@ if __name__ == "__main__": "panel", "colorcet", # 0.20.28 has this issue: https://github.com/pola-rs/polars/issues/16442 - "polars >= 0.20.16, != 0.20.28, < 1.0.0", + "polars >= 0.20.16, != 0.20.28", # Pandas >= 1.0.0 has support for new nullable dtypes # Pandas 1.2.0 has broken barplots: # https://github.com/pandas-dev/pandas/issues/38947 -- GitLab From 869ddcbc3319443815be4311c0c063e929a6114d Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Mon, 1 Jul 2024 18:02:34 +0100 Subject: [PATCH 2/3] setup.py: Add 1.0.0 <= polars < 2.0.0 version bound Make sure we do not get caught in breaking changes at 2.0 release and that the fixes applied for breaking changes in 1.0.0 are not creating issues with pre-1.0.0 releases. --- setup.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/setup.py b/setup.py index a68c383cd..eababcf7a 100755 --- a/setup.py +++ b/setup.py @@ -136,8 +136,7 @@ if __name__ == "__main__": "holoviews", "panel", "colorcet", - # 0.20.28 has this issue: https://github.com/pola-rs/polars/issues/16442 - "polars >= 0.20.16, != 0.20.28", + "polars >= 1.0.0, < 2.0.0", # Pandas >= 1.0.0 has support for new nullable dtypes # Pandas 1.2.0 has broken barplots: # https://github.com/pandas-dev/pandas/issues/38947 -- GitLab From d35831f78eeea18c5c7a661f51c1355326dd46e5 Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Mon, 1 Jul 2024 15:29:44 +0100 Subject: [PATCH 3/3] lisa: Upgrade code for polars 1.0.0 FIX --- lisa/analysis/base.py | 2 +- lisa/analysis/tasks.py | 4 +-- lisa/datautils.py | 13 +++++---- lisa/trace.py | 63 ++++++++++++++++-------------------------- 4 files changed, 35 insertions(+), 47 deletions(-) diff --git a/lisa/analysis/base.py b/lisa/analysis/base.py index 802ab228b..d4ea719ce 100644 --- a/lisa/analysis/base.py +++ b/lisa/analysis/base.py @@ -1195,7 +1195,7 @@ class TraceAnalysisBase(AnalysisHelpers): df, fmt=df_fmt, index=( - ('Time' if 'Time' in df.columns else None) + ('Time' if 'Time' in df.collect_schema().names() else None) if index is None and isinstance(df, (pl.LazyFrame, pl.DataFrame)) else index ), diff --git a/lisa/analysis/tasks.py b/lisa/analysis/tasks.py index 44399533e..bef9d2501 100644 --- a/lisa/analysis/tasks.py +++ b/lisa/analysis/tasks.py @@ -773,7 +773,7 @@ class TasksAnalysis(TraceAnalysisBase): @staticmethod def _reorder_tasks_states_columns(df): order = ['Time', 'pid', 'comm', 'target_cpu', 'cpu', 'curr_state', 'next_state', 'delta'] - return df.select(order_as(list(df.columns), order)) + return df.select(order_as(list(df.collect_schema().names()), order)) @_df_tasks_states.used_events @TraceAnalysisBase.df_method @@ -889,7 +889,7 @@ class TasksAnalysis(TraceAnalysisBase): } def fixup(df, col): - str_col = (pl.col(col) & 0xff).replace(mapping, default=None) + str_col = (pl.col(col) & 0xff).replace_strict(mapping, default=None) str_col = ( pl.when(str_col.is_null() & (pl.col(col) > 0)) .then(pl.col(col).map_elements(TaskState.sched_switch_str)) diff --git a/lisa/datautils.py b/lisa/datautils.py index b0bd1a3c8..676f118b8 100644 --- a/lisa/datautils.py +++ b/lisa/datautils.py @@ -236,7 +236,7 @@ NO_INDEX = _NoIndex() def _polars_index_col(df, index=None): - columns = df.columns + columns = df.collect_schema().names() if index is NO_INDEX: return None @@ -252,7 +252,8 @@ def _df_to_polars(df, index): if isinstance(df, pl.LazyFrame): index = _polars_index_col(df, index) if index is not None: - dtype = df.schema[index] + schema = df.collect_schema() + dtype = schema[index] # This skips a useless cast, saving some time on the common path if index == 'Time': if dtype != pl.Duration('ns'): @@ -270,7 +271,7 @@ def _df_to_polars(df, index): ) # Make the index column the first one - df = df.select(order_as(list(df.columns), [index])) + df = df.select(order_as(list(df.collect_schema().names()), [index])) # TODO: once this is solved, we can just inspect the plan and see if the # data is backed by a "DataFrameScan" instead of a "Scan" of a file: # https://github.com/pola-rs/polars/issues/9771 @@ -297,7 +298,8 @@ def _df_to_pandas(df, index): assert isinstance(df, pl.LazyFrame) index = _polars_index_col(df, index) - has_time_index = index == 'Time' and df.schema[index].is_temporal() + schema = df.collect_schema() + has_time_index = index == 'Time' and schema[index].is_temporal() if has_time_index: df = df.with_columns( pl.col(index).dt.total_nanoseconds() * 1e-9 @@ -1364,7 +1366,8 @@ def df_window_signals(df, window, signals, compress_init=False, clip_window=True def _polars_window_signals(df, window, signals, compress_init): index = _polars_index_col(df, index='Time') - assert df.schema[index].is_temporal() + schema = df.collect_schema() + assert schema[index].is_temporal() start, stop = window start = _polars_duration_expr(start, rounding='down') diff --git a/lisa/trace.py b/lisa/trace.py index 0c3d0a481..b0b5a3cec 100644 --- a/lisa/trace.py +++ b/lisa/trace.py @@ -59,6 +59,8 @@ import pandas as pd import pyarrow.lib import pyarrow.parquet import polars as pl +import polars.exceptions +import polars.selectors as cs import devlib @@ -349,12 +351,12 @@ def _lazyframe_rewrite(df, update_plan): # TODO: once this is solved, we can just inspect the plan rather than # serialize()/deserialize() in JSON # https://github.com/pola-rs/polars/issues/9771 - plan = df.serialize() + plan = df.serialize(format='json') plan = json.loads(plan) plan = update_plan(plan) plan = json.dumps(plan) plan = io.StringIO(plan) - df = pl.LazyFrame.deserialize(plan) + df = pl.LazyFrame.deserialize(plan, format='json') return df @@ -1001,7 +1003,7 @@ class TraceDumpTraceParser(TraceParserBase): }) df = df.with_columns([ pl.col('Time').cast(pl.Duration("ns")), - pl.col('__pid').replace(pid_comms).alias('__comm') + pl.col('__pid').replace_strict(pid_comms, default=None).alias('__comm') ]) df = df.drop('common_type', 'common_flags', 'common_preempt_count') @@ -1021,17 +1023,7 @@ class TraceDumpTraceParser(TraceParserBase): # Turn all string columns into categorical columns, since strings are # typically extremely repetitive - categorical_cols = [ - col - for col, dtype in df.schema.items() - if isinstance(dtype, (pl.String, pl.Binary)) - ] - - if categorical_cols: - df = df.with_columns([ - pl.col(col).cast(pl.Categorical) - for col in categorical_cols - ]) + df = df.with_columns((cs.string() | cs.binary()).cast(pl.Categorical)) return df @@ -1572,11 +1564,7 @@ class TxtTraceParserBase(TraceParserBase): infer_schema_length=None, ).lazy() - df = df.with_columns( - pl.col(col).cast(pl.String) - for col, dtype in df.schema.items() - if isinstance(dtype, pl.Binary) - ) + df = df.with_columns(cs.binary().cast(pl.String)) # Put the timestamp first so it's recognized as the index df = df.select( @@ -1754,7 +1742,7 @@ class TxtTraceParserBase(TraceParserBase): # Drop unnecessary columns that might have been parsed by the regex to_keep = {'__timestamp', '__event', '__fields', 'line'} - skeleton_df = skeleton_df.select(sorted(to_keep & set(skeleton_df.columns))) + skeleton_df = skeleton_df.select(sorted(to_keep & set(skeleton_df.collect_schema().names()))) # This is very fast on a category dtype available_events.update( skeleton_df.select( @@ -1897,7 +1885,7 @@ class TxtTraceParserBase(TraceParserBase): # we can accurately infer the dtype. df = pl.DataFrame({ col: [get_representative(df, col)] - for col in df.columns + for col in df.collect_schema().names() }) # Ugly hack: dump the first row to CSV to infer the schema of the @@ -1918,11 +1906,7 @@ class TxtTraceParserBase(TraceParserBase): # Polars will fail to convert strings with trailing spaces (e.g. "42 " # into 42), so ensure we clean that up before conversion - df = df.with_columns( - pl.col(col).str.strip_chars() - for col, dtype in df.schema.items() - if isinstance(dtype, pl.String) - ) + df = df.with_columns(cs.string().str.strip_chars()) df = df.with_columns( pl.col(name).cast(dtype) @@ -1930,8 +1914,9 @@ class TxtTraceParserBase(TraceParserBase): ) df = df.rename({'__timestamp': 'Time'}) + schema = df.collect_schema() if event == 'sched_switch': - if isinstance(df.schema['prev_state'], (pl.String, pl.Categorical)): + if isinstance(schema['prev_state'], (pl.String, pl.Categorical)): # Avoid circular dependency issue by importing at the last moment # pylint: disable=import-outside-toplevel from lisa.analysis.tasks import TaskState @@ -1947,7 +1932,7 @@ class TxtTraceParserBase(TraceParserBase): pl.col('overutilized').cast(pl.Boolean) ) - if isinstance(df.schema.get('span'), (pl.String, pl.Binary, pl.Categorical)): + if isinstance(schema.get('span'), (pl.String, pl.Binary, pl.Categorical)): df = df.with_columns( pl.col('span') .cast(pl.String) @@ -1964,7 +1949,7 @@ class TxtTraceParserBase(TraceParserBase): df = df.rename({'cpus': 'cpumask'}) if event == 'thermal_power_cpu_get_power': - if isinstance(df.schema['load'], (pl.String, pl.Binary, pl.Categorical)): + if isinstance(schema['load'], (pl.String, pl.Binary, pl.Categorical)): df = df.with_columns( # Parse b'{2 3 2 8}' pl.col('load') @@ -4377,7 +4362,7 @@ class _TraceCache(Loggable): to_parquet() else: try: - plan = data.serialize() + plan = data.serialize(format='json') # We failed to serialize the logical plan. This could happen # because it contains references to UDF (e.g. a lambda passed # to Expr.map_elements()) @@ -4434,7 +4419,7 @@ class _TraceCache(Loggable): # correctly. pandas_meta = json.loads(pandas_meta.decode('utf-8')) index_cols = pandas_meta['index_columns'] - df = df.select(order_as(df.columns, index_cols)) + df = df.select(order_as(df.collect_schema().names(), index_cols)) return df @@ -4448,7 +4433,7 @@ class _TraceCache(Loggable): elif fmt == 'polars-lazyframe': try: data = load_parquet(path) - except pl.ComputeError: + except polars.exceptions.ComputeError: with open(path, 'r') as f: plan = json.load(f) @@ -4459,7 +4444,7 @@ class _TraceCache(Loggable): ) plan = json.dumps(plan) plan = io.StringIO(plan) - data = pl.LazyFrame.deserialize(plan) + data = pl.LazyFrame.deserialize(plan, format='json') data = _LazyFrameOnDelete.attach_file_cleanup(data, hardlinks) else: raise ValueError(f'File format not supported "{fmt}" at path: {path}') @@ -4964,7 +4949,7 @@ class _Trace(Loggable, _InternalTraceBase): # writing to: /sys/kernel/debug/tracing/trace_marker # That said, it's not the end of the world if we don't filter on that # as the meta event name is supposed to be unique anyway - if isinstance(df.schema['ip'], (pl.String, pl.Binary, pl.Categorical)): + if isinstance(df.collect_schema()['ip'], (pl.String, pl.Binary, pl.Categorical)): df = df.filter(pl.col('ip').cast(pl.String).str.starts_with('tracing_mark_write')) return (df, 'buf') @@ -5533,7 +5518,7 @@ class _Trace(Loggable, _InternalTraceBase): self._internal_df_event(event) for event in checked_events ) - if '__cpu' in df.columns + if '__cpu' in df.collect_schema().names() ) count = max_cpu + 1 self.logger.debug(f"Estimated CPU count from trace: {count}") @@ -5813,17 +5798,17 @@ class _Trace(Loggable, _InternalTraceBase): assert isinstance(df, pl.LazyFrame) # Add all the header fields from the source dataframes - extra_fields = [x for x in df.columns if x.startswith('__')] - extra_df = df.select(('Time', *extra_fields)) + extra_df = df.select(cs.by_name('Time') | cs.starts_with('__')) with tempfile.TemporaryDirectory() as temp_dir: for (meta_event, event, _source_event, source_getter) in specs: # pylint: disable=unused-variable source_df, line_field = source_getter(self, _source_event, event, df) + schema = source_df.collect_schema() # If the lines are in a dtype we won't be able to # handle, we won't add an entry to df_map, leading to a # missing event - if not isinstance(source_df.schema[line_field], (pl.String, pl.Categorical, pl.Binary)): + if not isinstance(schema[line_field], (pl.String, pl.Categorical, pl.Binary)): continue # Ensure we have bytes and not str @@ -5883,7 +5868,7 @@ class _Trace(Loggable, _InternalTraceBase): _df = _df.select( order_as( sorted( - _df.columns, + _df.collect_schema().names(), key=lambda col: 0 if col.startswith('__') else 1 ), ['Time'] -- GitLab