diff --git a/lisa/datautils.py b/lisa/datautils.py index 511e1ddb77cfd15b86340c1db3a2e4a7d657d573..e12f45c9fcd11b76333a50dab433c60f5d6b4848 100644 --- a/lisa/datautils.py +++ b/lisa/datautils.py @@ -1468,6 +1468,39 @@ def _data_combine(datas, func, fill_value=None): return state +def _data_extend_index(data, extension): + # DO NOT USE THE sort PARAMETER + # read the documentation and you will understand why it should not be + # touched even with a stick + new_index = data.index.union(extension).sort_values() + return data.reindex(new_index) + + +@SeriesAccessor.register_accessor +def series_extend_index(series, extension): + """ + Extends the index of a :class:`pandas.Series` + + :param series: Series to extend the index of + :type series: pandas.Series + + :param extension: Extra index values to add to the series. + :type extension: pandas.Series or pandas.Index + + .. note:: The index will be sorted. + """ + return _data_extend_index(series, extension) + + +@DataFrameAccessor.register_accessor +def df_extend_index(df, extension): + """ + Same as :func:`series_extend_index` but acting on a :class:`pandas.DataFrame` + """ + return _data_extend_index(df, extension) + + + class SignalDesc: """ Define a signal to be used by various signal-oriented APIs. diff --git a/lisa/pelt.py b/lisa/pelt.py index 8a30146c077e3bc190db8eeefc0baf709f8aa171..f8c45d7c5c765fee8fff462636dbfb3b410910e4 100644 --- a/lisa/pelt.py +++ b/lisa/pelt.py @@ -20,7 +20,7 @@ import math import pandas as pd import numpy as np -from lisa.datautils import series_envelope_mean +from lisa.datautils import series_envelope_mean, df_add_delta, series_extend_index PELT_WINDOW = 1024 * 1024 * 1e-9 """ @@ -266,4 +266,108 @@ def kernel_util_mean(util, plat_info): """ return series_envelope_mean(util) +def pelt_interpolate(util, clock, interpolate_at=None): + """ + Interpolate the utilization with an interpolate_at signal and + re-indexing on clock. + + :param util: CPU utilization over time. + :type util: pandas.Series + + :param clock: A series of timestamps providing the simulated PELT clock. + :type clock: pandas.Series + + :param interpolate_at: A series of additional timestamps for which the CPU + utilization has to be calculated. It can be omitted in case util + already contains those extra timestamps. + :type interpolate_at: pandas.Series + """ + if interpolate_at is not None: + util = series_extend_index(util, interpolate_at) + + df_util = pd.DataFrame(dict(util=util)) + df_util = df_util.assign(new_index=clock.values) + df_util = df_util.set_index('new_index') + + df_util = df_add_delta(df_util) + df_util['delta'] = df_util['delta'].shift() + df_util['prev_util'] = df_util['util'].shift() + + df_util = df_util.dropna(subset=['delta']) + + def compute_switch_phase_df(row): + # Applying the function on an empty dataframe will lead to being called + # with a Series, so the return value will not matter. + if row.empty: + return + + timestamp = row.name + last_update = timestamp - row.delta + prev_util = row.prev_util + window_shrink = 1e3 + activations = pd.Series([1, 1], index=[last_update, timestamp]) + simulated_phase_df = simulate_pelt( + activations, + init=prev_util, + window=PELT_WINDOW / window_shrink, + half_life=PELT_HALF_LIFE * window_shrink + ) + return simulated_phase_df.iloc[-1] + + switch_loc = df_util['util'].isnull() + df_util.loc[switch_loc, 'util'] = df_util.loc[switch_loc].apply(compute_switch_phase_df, axis='columns') + + return df_util['util'] + +def simulate_pelt_clock(capacity, clock, scale=PELT_SCALE): + """ + Simulate a PELT clock of an entity from the capacities of the CPU it's + residing on. + + :param capacity: CPU capacity over time. + :type capacity: pandas.Series + + :param clock: A series of timestamps at which the clock is to be observed. + The returned :class:`pandas.Series` will provide the simulated clock + values at these instants. + :type clock: pandas.Series + + :param scale: Maximum value allowed for CPU capacity. + :type scale: float + """ + # Ensures the clock's index is the same as the clock + clock = clock.copy(deep=False) + clock.index = clock + df = pd.DataFrame( + dict( + clock=clock, + capacity=capacity, + ), + ) + # Remember which row is part of the user-provided clock + df['orig_clock'] = ~df['clock'].isna() + # Needed for "time" interpolation + df.index = pd.TimedeltaIndex(df.index, unit='s') + # Shift so that the capacity is aligned with the corresponding delta + df['capacity'] = df['capacity'].fillna(method='ffill').shift() + # Time flows linearly between 2 samples of the clock + df['clock'].interpolate(method='time', inplace=True) + # If there is an initial NaN in the clock or capacity, remove it since + # interpolate() cannot cope with that correctly even with + # limit_direction='both' + df.dropna(inplace=True) + df['delta'] = df['clock'].diff() + # Scale each delta independantly + df['delta'] *= df['capacity'] / scale + # Fill the NaN with the initial value for the cumsum() fold + df['delta'].iat[0] = df['clock'].iat[0] + # Reverse df_add_delta() now that we scaled each delta + df['new_clock'] = df['delta'].cumsum() + # Back to Float64Index + df.index = df.index.total_seconds() + # Filter-out all the rows that were introduced by the capacity changes but + # are not part of the clock requested by the user + df = df[df['orig_clock'] == True] + return df['new_clock'] + # vim :set tabstop=4 shiftwidth=4 textwidth=80 expandtab diff --git a/lisa/tests/scheduler/load_tracking.py b/lisa/tests/scheduler/load_tracking.py index 6f84e65ae09d3841ebade17101dc30399c6a44c6..70b01d3749bd9b16074864e125e640d2cf22cb96 100644 --- a/lisa/tests/scheduler/load_tracking.py +++ b/lisa/tests/scheduler/load_tracking.py @@ -29,14 +29,14 @@ from lisa.tests.base import ( ) from lisa.target import Target from lisa.utils import ArtifactPath, groupby, ExekallTaggable -from lisa.datautils import series_mean, df_window, df_filter_task_ids, series_refit_index, df_split_signals, df_refit_index +from lisa.datautils import series_mean, df_window, df_filter_task_ids, series_refit_index, df_split_signals, df_refit_index, df_extend_index from lisa.wlgen.rta import RTA, Periodic, RTATask from lisa.trace import FtraceCollector, requires_events, may_use_events, MissingTraceEventError from lisa.analysis.load_tracking import LoadTrackingAnalysis from lisa.analysis.tasks import TasksAnalysis from lisa.analysis.rta import RTAEventsAnalysis from lisa.analysis.frequency import FrequencyAnalysis -from lisa.pelt import PELT_SCALE, simulate_pelt, pelt_settling_time, kernel_util_mean +from lisa.pelt import PELT_SCALE, simulate_pelt, pelt_settling_time, kernel_util_mean, pelt_interpolate, simulate_pelt_clock UTIL_SCALE = PELT_SCALE @@ -906,7 +906,6 @@ class CPUMigrationBase(LoadTrackingBase): phase_df = df_window(df, window, clip_window=True) for cpu in self.cpus: - if cpus_rel_freq is None: rel_freq_mean = 1 else: @@ -957,23 +956,59 @@ class CPUMigrationBase(LoadTrackingBase): """ df = self.trace.analysis.load_tracking.df_cpus_signal('util') tasks = self.rtapp_task_ids_map.keys() - task = sorted(task for task in tasks if task.startswith('migr'))[0] - task = self.rtapp_task_ids_map[task][0] + migr_task = sorted(task for task in tasks if task.startswith('migr'))[0] + migr_task = self.rtapp_task_ids_map[migr_task][0] + + def get_inactive(tasks): + task, = tasks + activations = self.trace.analysis.tasks.df_task_activation(task) + return activations[activations['active'] == 0] + + df_state = { + name: get_inactive(tasks) + for name, tasks in self.rtapp_task_ids_map.items() + } + cpu_capacities = self.plat_info['cpu-capacities']['rtapp'] cpu_util = {} - for row in self.trace.analysis.rta.df_phases(task).itertuples(): + cpu_freqs = self.plat_info['freqs'] + + freq_df = self.trace.analysis.frequency.df_cpus_frequency() + cpus_rel_freq = { + # Frequency, normalized according to max frequency on that CPU + cols['cpu']: df['frequency'] / max(cpu_freqs[cols['cpu']]) + for cols, df in df_split_signals(freq_df, ['cpu']) + } + + for row in self.trace.analysis.rta.df_phases(migr_task).itertuples(): phase = row.phase + + # TODO: remove that once we have named phases to skip the buffer phase. + if phase == 0: + continue + duration = row.duration start = row.Index end = start + duration # Ignore the first quarter of the util signal of each phase, since # it's impacted by the phase change, and util can be affected # (rtapp does some bookkeeping at the beginning of phases) - start += duration / 4 + # start += duration / 4 phase_df = df_window(df, (start, end), method='pre', clip_window=True) for cpu in self.cpus: - util = phase_df[phase_df['cpu'] == cpu]['util'] + cpu_phase_df = phase_df[phase_df['cpu'] == cpu] + df_state_clipped = pd.concat( + df_window(df_state[task], (start, end), method='pre', clip_window=True) + for task, wlgen_task in self.rtapp_profile.items() + if cpu in wlgen_task.phases[phase].cpus + ).sort_index() + df_util = df_extend_index(cpu_phase_df, df_state_clipped.index) + + cpu_capacity = cpu_capacities[cpu] * cpus_rel_freq[cpu] + clock = simulate_pelt_clock(cpu_capacity, clock=df_util.index.to_series()) + + util = pelt_interpolate(df_util['util'], clock=clock) cpu_util.setdefault(cpu, {})[phase] = kernel_util_mean(util, plat_info=self.plat_info) return cpu_util