diff --git a/lisa/analysis/base.py b/lisa/analysis/base.py index 802ab228b3a73e2f5cba5c7b4c7f99f34109f0a5..d4ea719ce3f654efa501f747f49366f8799ffdb8 100644 --- a/lisa/analysis/base.py +++ b/lisa/analysis/base.py @@ -1195,7 +1195,7 @@ class TraceAnalysisBase(AnalysisHelpers): df, fmt=df_fmt, index=( - ('Time' if 'Time' in df.columns else None) + ('Time' if 'Time' in df.collect_schema().names() else None) if index is None and isinstance(df, (pl.LazyFrame, pl.DataFrame)) else index ), diff --git a/lisa/analysis/tasks.py b/lisa/analysis/tasks.py index 44399533ee603652af284768bd0b4fc4c91a3f9a..bef9d2501dadcf4deb50434df0d6a694710105dc 100644 --- a/lisa/analysis/tasks.py +++ b/lisa/analysis/tasks.py @@ -773,7 +773,7 @@ class TasksAnalysis(TraceAnalysisBase): @staticmethod def _reorder_tasks_states_columns(df): order = ['Time', 'pid', 'comm', 'target_cpu', 'cpu', 'curr_state', 'next_state', 'delta'] - return df.select(order_as(list(df.columns), order)) + return df.select(order_as(list(df.collect_schema().names()), order)) @_df_tasks_states.used_events @TraceAnalysisBase.df_method @@ -889,7 +889,7 @@ class TasksAnalysis(TraceAnalysisBase): } def fixup(df, col): - str_col = (pl.col(col) & 0xff).replace(mapping, default=None) + str_col = (pl.col(col) & 0xff).replace_strict(mapping, default=None) str_col = ( pl.when(str_col.is_null() & (pl.col(col) > 0)) .then(pl.col(col).map_elements(TaskState.sched_switch_str)) diff --git a/lisa/datautils.py b/lisa/datautils.py index b0bd1a3c837a413f2e5d92d13552b6397c698a40..676f118b863abffec5b451db5fdfe95f3bfbd2cb 100644 --- a/lisa/datautils.py +++ b/lisa/datautils.py @@ -236,7 +236,7 @@ NO_INDEX = _NoIndex() def _polars_index_col(df, index=None): - columns = df.columns + columns = df.collect_schema().names() if index is NO_INDEX: return None @@ -252,7 +252,8 @@ def _df_to_polars(df, index): if isinstance(df, pl.LazyFrame): index = _polars_index_col(df, index) if index is not None: - dtype = df.schema[index] + schema = df.collect_schema() + dtype = schema[index] # This skips a useless cast, saving some time on the common path if index == 'Time': if dtype != pl.Duration('ns'): @@ -270,7 +271,7 @@ def _df_to_polars(df, index): ) # Make the index column the first one - df = df.select(order_as(list(df.columns), [index])) + df = df.select(order_as(list(df.collect_schema().names()), [index])) # TODO: once this is solved, we can just inspect the plan and see if the # data is backed by a "DataFrameScan" instead of a "Scan" of a file: # https://github.com/pola-rs/polars/issues/9771 @@ -297,7 +298,8 @@ def _df_to_pandas(df, index): assert isinstance(df, pl.LazyFrame) index = _polars_index_col(df, index) - has_time_index = index == 'Time' and df.schema[index].is_temporal() + schema = df.collect_schema() + has_time_index = index == 'Time' and schema[index].is_temporal() if has_time_index: df = df.with_columns( pl.col(index).dt.total_nanoseconds() * 1e-9 @@ -1364,7 +1366,8 @@ def df_window_signals(df, window, signals, compress_init=False, clip_window=True def _polars_window_signals(df, window, signals, compress_init): index = _polars_index_col(df, index='Time') - assert df.schema[index].is_temporal() + schema = df.collect_schema() + assert schema[index].is_temporal() start, stop = window start = _polars_duration_expr(start, rounding='down') diff --git a/lisa/trace.py b/lisa/trace.py index 0c3d0a481bffda0d420b25c9a145f9fd545b37c3..b0b5a3cec87467a097ee6d14fbb9e0038c60fd99 100644 --- a/lisa/trace.py +++ b/lisa/trace.py @@ -59,6 +59,8 @@ import pandas as pd import pyarrow.lib import pyarrow.parquet import polars as pl +import polars.exceptions +import polars.selectors as cs import devlib @@ -349,12 +351,12 @@ def _lazyframe_rewrite(df, update_plan): # TODO: once this is solved, we can just inspect the plan rather than # serialize()/deserialize() in JSON # https://github.com/pola-rs/polars/issues/9771 - plan = df.serialize() + plan = df.serialize(format='json') plan = json.loads(plan) plan = update_plan(plan) plan = json.dumps(plan) plan = io.StringIO(plan) - df = pl.LazyFrame.deserialize(plan) + df = pl.LazyFrame.deserialize(plan, format='json') return df @@ -1001,7 +1003,7 @@ class TraceDumpTraceParser(TraceParserBase): }) df = df.with_columns([ pl.col('Time').cast(pl.Duration("ns")), - pl.col('__pid').replace(pid_comms).alias('__comm') + pl.col('__pid').replace_strict(pid_comms, default=None).alias('__comm') ]) df = df.drop('common_type', 'common_flags', 'common_preempt_count') @@ -1021,17 +1023,7 @@ class TraceDumpTraceParser(TraceParserBase): # Turn all string columns into categorical columns, since strings are # typically extremely repetitive - categorical_cols = [ - col - for col, dtype in df.schema.items() - if isinstance(dtype, (pl.String, pl.Binary)) - ] - - if categorical_cols: - df = df.with_columns([ - pl.col(col).cast(pl.Categorical) - for col in categorical_cols - ]) + df = df.with_columns((cs.string() | cs.binary()).cast(pl.Categorical)) return df @@ -1572,11 +1564,7 @@ class TxtTraceParserBase(TraceParserBase): infer_schema_length=None, ).lazy() - df = df.with_columns( - pl.col(col).cast(pl.String) - for col, dtype in df.schema.items() - if isinstance(dtype, pl.Binary) - ) + df = df.with_columns(cs.binary().cast(pl.String)) # Put the timestamp first so it's recognized as the index df = df.select( @@ -1754,7 +1742,7 @@ class TxtTraceParserBase(TraceParserBase): # Drop unnecessary columns that might have been parsed by the regex to_keep = {'__timestamp', '__event', '__fields', 'line'} - skeleton_df = skeleton_df.select(sorted(to_keep & set(skeleton_df.columns))) + skeleton_df = skeleton_df.select(sorted(to_keep & set(skeleton_df.collect_schema().names()))) # This is very fast on a category dtype available_events.update( skeleton_df.select( @@ -1897,7 +1885,7 @@ class TxtTraceParserBase(TraceParserBase): # we can accurately infer the dtype. df = pl.DataFrame({ col: [get_representative(df, col)] - for col in df.columns + for col in df.collect_schema().names() }) # Ugly hack: dump the first row to CSV to infer the schema of the @@ -1918,11 +1906,7 @@ class TxtTraceParserBase(TraceParserBase): # Polars will fail to convert strings with trailing spaces (e.g. "42 " # into 42), so ensure we clean that up before conversion - df = df.with_columns( - pl.col(col).str.strip_chars() - for col, dtype in df.schema.items() - if isinstance(dtype, pl.String) - ) + df = df.with_columns(cs.string().str.strip_chars()) df = df.with_columns( pl.col(name).cast(dtype) @@ -1930,8 +1914,9 @@ class TxtTraceParserBase(TraceParserBase): ) df = df.rename({'__timestamp': 'Time'}) + schema = df.collect_schema() if event == 'sched_switch': - if isinstance(df.schema['prev_state'], (pl.String, pl.Categorical)): + if isinstance(schema['prev_state'], (pl.String, pl.Categorical)): # Avoid circular dependency issue by importing at the last moment # pylint: disable=import-outside-toplevel from lisa.analysis.tasks import TaskState @@ -1947,7 +1932,7 @@ class TxtTraceParserBase(TraceParserBase): pl.col('overutilized').cast(pl.Boolean) ) - if isinstance(df.schema.get('span'), (pl.String, pl.Binary, pl.Categorical)): + if isinstance(schema.get('span'), (pl.String, pl.Binary, pl.Categorical)): df = df.with_columns( pl.col('span') .cast(pl.String) @@ -1964,7 +1949,7 @@ class TxtTraceParserBase(TraceParserBase): df = df.rename({'cpus': 'cpumask'}) if event == 'thermal_power_cpu_get_power': - if isinstance(df.schema['load'], (pl.String, pl.Binary, pl.Categorical)): + if isinstance(schema['load'], (pl.String, pl.Binary, pl.Categorical)): df = df.with_columns( # Parse b'{2 3 2 8}' pl.col('load') @@ -4377,7 +4362,7 @@ class _TraceCache(Loggable): to_parquet() else: try: - plan = data.serialize() + plan = data.serialize(format='json') # We failed to serialize the logical plan. This could happen # because it contains references to UDF (e.g. a lambda passed # to Expr.map_elements()) @@ -4434,7 +4419,7 @@ class _TraceCache(Loggable): # correctly. pandas_meta = json.loads(pandas_meta.decode('utf-8')) index_cols = pandas_meta['index_columns'] - df = df.select(order_as(df.columns, index_cols)) + df = df.select(order_as(df.collect_schema().names(), index_cols)) return df @@ -4448,7 +4433,7 @@ class _TraceCache(Loggable): elif fmt == 'polars-lazyframe': try: data = load_parquet(path) - except pl.ComputeError: + except polars.exceptions.ComputeError: with open(path, 'r') as f: plan = json.load(f) @@ -4459,7 +4444,7 @@ class _TraceCache(Loggable): ) plan = json.dumps(plan) plan = io.StringIO(plan) - data = pl.LazyFrame.deserialize(plan) + data = pl.LazyFrame.deserialize(plan, format='json') data = _LazyFrameOnDelete.attach_file_cleanup(data, hardlinks) else: raise ValueError(f'File format not supported "{fmt}" at path: {path}') @@ -4964,7 +4949,7 @@ class _Trace(Loggable, _InternalTraceBase): # writing to: /sys/kernel/debug/tracing/trace_marker # That said, it's not the end of the world if we don't filter on that # as the meta event name is supposed to be unique anyway - if isinstance(df.schema['ip'], (pl.String, pl.Binary, pl.Categorical)): + if isinstance(df.collect_schema()['ip'], (pl.String, pl.Binary, pl.Categorical)): df = df.filter(pl.col('ip').cast(pl.String).str.starts_with('tracing_mark_write')) return (df, 'buf') @@ -5533,7 +5518,7 @@ class _Trace(Loggable, _InternalTraceBase): self._internal_df_event(event) for event in checked_events ) - if '__cpu' in df.columns + if '__cpu' in df.collect_schema().names() ) count = max_cpu + 1 self.logger.debug(f"Estimated CPU count from trace: {count}") @@ -5813,17 +5798,17 @@ class _Trace(Loggable, _InternalTraceBase): assert isinstance(df, pl.LazyFrame) # Add all the header fields from the source dataframes - extra_fields = [x for x in df.columns if x.startswith('__')] - extra_df = df.select(('Time', *extra_fields)) + extra_df = df.select(cs.by_name('Time') | cs.starts_with('__')) with tempfile.TemporaryDirectory() as temp_dir: for (meta_event, event, _source_event, source_getter) in specs: # pylint: disable=unused-variable source_df, line_field = source_getter(self, _source_event, event, df) + schema = source_df.collect_schema() # If the lines are in a dtype we won't be able to # handle, we won't add an entry to df_map, leading to a # missing event - if not isinstance(source_df.schema[line_field], (pl.String, pl.Categorical, pl.Binary)): + if not isinstance(schema[line_field], (pl.String, pl.Categorical, pl.Binary)): continue # Ensure we have bytes and not str @@ -5883,7 +5868,7 @@ class _Trace(Loggable, _InternalTraceBase): _df = _df.select( order_as( sorted( - _df.columns, + _df.collect_schema().names(), key=lambda col: 0 if col.startswith('__') else 1 ), ['Time'] diff --git a/setup.py b/setup.py index d3aa27d77b7b18e0553b5688525ae780c47393e7..eababcf7a91a4ed1df0d54bcae9610f6984dbade 100755 --- a/setup.py +++ b/setup.py @@ -136,8 +136,7 @@ if __name__ == "__main__": "holoviews", "panel", "colorcet", - # 0.20.28 has this issue: https://github.com/pola-rs/polars/issues/16442 - "polars >= 0.20.16, != 0.20.28, < 1.0.0", + "polars >= 1.0.0, < 2.0.0", # Pandas >= 1.0.0 has support for new nullable dtypes # Pandas 1.2.0 has broken barplots: # https://github.com/pandas-dev/pandas/issues/38947