From 78bb5dd9ae0af8cccf75cbbc1ca594681c6df5c9 Mon Sep 17 00:00:00 2001 From: Dietmar Eggemann Date: Fri, 14 Aug 2020 13:01:35 +0200 Subject: [PATCH 1/8] lisa.datautils: Provide series_extend_index() and df_extend_index() Signed-off-by: Dietmar Eggemann --- lisa/datautils.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/lisa/datautils.py b/lisa/datautils.py index 511e1ddb7..16d419726 100644 --- a/lisa/datautils.py +++ b/lisa/datautils.py @@ -1851,4 +1851,29 @@ SignalDesc._SIGNALS_MAP = { for event, signal_descs in groupby(_SIGNALS, key=attrgetter('event')) } +def _data_extend_index(data, extension): + """ + ``data`` can either be a :class:`pandas.DataFrame` or :class:`pandas.Series` + """ + return data.join(extension, how='outer') + +@SeriesAccessor.register_accessor +def series_extend_index(series, extension): + """ + Extending the index of a :class:`pandas.Series + + :param: series: series to extend + :type series: :class:`pandas.Series` + + :param extension: series holding the extension + :type extension: class:`pandas.Series` + """ + return _data_extend_index(series, extension) + +@DataFrameAccessor.register_accessor +def df_extend_index(df, extension): + """ + Same as :func:`series_extend_index` but acting on a :class:`pandas.DataFrame` + """ + return _data_extend_index(df, extension) # vim :set tabstop=4 shiftwidth=4 textwidth=80 expandtab -- GitLab From ae4180432865ae386592f4802ff9dd277f84fdca Mon Sep 17 00:00:00 2001 From: douglas-raillard-arm Date: Tue, 29 Sep 2020 13:20:43 +0100 Subject: [PATCH 2/8] WIP --- lisa/datautils.py | 58 +++++++++++++++++++++++++++-------------------- 1 file changed, 33 insertions(+), 25 deletions(-) diff --git a/lisa/datautils.py b/lisa/datautils.py index 16d419726..e12f45c9f 100644 --- a/lisa/datautils.py +++ b/lisa/datautils.py @@ -1468,6 +1468,39 @@ def _data_combine(datas, func, fill_value=None): return state +def _data_extend_index(data, extension): + # DO NOT USE THE sort PARAMETER + # read the documentation and you will understand why it should not be + # touched even with a stick + new_index = data.index.union(extension).sort_values() + return data.reindex(new_index) + + +@SeriesAccessor.register_accessor +def series_extend_index(series, extension): + """ + Extends the index of a :class:`pandas.Series` + + :param series: Series to extend the index of + :type series: pandas.Series + + :param extension: Extra index values to add to the series. + :type extension: pandas.Series or pandas.Index + + .. note:: The index will be sorted. + """ + return _data_extend_index(series, extension) + + +@DataFrameAccessor.register_accessor +def df_extend_index(df, extension): + """ + Same as :func:`series_extend_index` but acting on a :class:`pandas.DataFrame` + """ + return _data_extend_index(df, extension) + + + class SignalDesc: """ Define a signal to be used by various signal-oriented APIs. @@ -1851,29 +1884,4 @@ SignalDesc._SIGNALS_MAP = { for event, signal_descs in groupby(_SIGNALS, key=attrgetter('event')) } -def _data_extend_index(data, extension): - """ - ``data`` can either be a :class:`pandas.DataFrame` or :class:`pandas.Series` - """ - return data.join(extension, how='outer') - -@SeriesAccessor.register_accessor -def series_extend_index(series, extension): - """ - Extending the index of a :class:`pandas.Series - - :param: series: series to extend - :type series: :class:`pandas.Series` - - :param extension: series holding the extension - :type extension: class:`pandas.Series` - """ - return _data_extend_index(series, extension) - -@DataFrameAccessor.register_accessor -def df_extend_index(df, extension): - """ - Same as :func:`series_extend_index` but acting on a :class:`pandas.DataFrame` - """ - return _data_extend_index(df, extension) # vim :set tabstop=4 shiftwidth=4 textwidth=80 expandtab -- GitLab From 2255367bdcba01cc3f90a44c906ed0c72ef3e3cb Mon Sep 17 00:00:00 2001 From: Dietmar Eggemann Date: Fri, 14 Aug 2020 09:30:31 +0200 Subject: [PATCH 3/8] lisa.pelt: Add pelt_interpolate() and simulate_pelt_clock() Since the delta between the last trace_pelt_cfs_tp(root cfs_rq) and the sched_switch event is often < 1024us, simulate_pelt() is called with window=PELT_WINDOW/1e3 and half_life=PELT_HALF_LIFE*1e3. The PELT simulator would normally only respect 1024us windows, following the Linux kernel implementation. Signed-off-by: Dietmar Eggemann --- lisa/pelt.py | 100 ++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 99 insertions(+), 1 deletion(-) diff --git a/lisa/pelt.py b/lisa/pelt.py index 8a30146c0..837b7d3ac 100644 --- a/lisa/pelt.py +++ b/lisa/pelt.py @@ -20,7 +20,7 @@ import math import pandas as pd import numpy as np -from lisa.datautils import series_envelope_mean +from lisa.datautils import series_envelope_mean, df_add_delta, series_extend_index PELT_WINDOW = 1024 * 1024 * 1e-9 """ @@ -266,4 +266,102 @@ def kernel_util_mean(util, plat_info): """ return series_envelope_mean(util) +def pelt_interpolate(util, clock, interpolate_at=None): + """ + Interpolate the utilization with an interpolate_at signal and + re-indexing on clock. + + :param util: CPU utilization over time. + :type util: pandas.Series + + :param clock: A series of timestamps providing the simulated PELT clock. + :type clock: pandas.Series + + :param interpolate_at: A series of additional timestamps for which the + CPU utilization has to be calculated. It can be omitted in case + util already contains those extra timestamps. + :type interpolate_at: pandas.Series + """ + if interpolate_at is not None: + util = series_extend_index(util, interpolate_at) + + df_util = util.to_frame() + df_util = df_util.assign(new_index=clock.values) + df_util = df_util.set_index('new_index') + + df_util = df_add_delta(df_util) + df_util['delta'] = df_util['delta'].shift() + df_util['prev_util'] = df_util['util'].shift() + + df_util = df_util.dropna(subset=['delta']) + + def compute_switch_phase_df(row): + # Applying the function on an empty dataframe will lead to being called + # with a Series, so the return value will not matter. + if row.empty: + return + + timestamp = row.name + last_update = timestamp - row.delta + prev_util = row.prev_util + window_shrink = 1e3 + activations = pd.Series([1, 1], index=[last_update, timestamp]) + simulated_phase_df = simulate_pelt(activations, init=prev_util, + window=PELT_WINDOW/window_shrink, + half_life=PELT_HALF_LIFE*window_shrink) + return simulated_phase_df.iloc[-1] + + switch_loc = df_util['util'].isnull() + df_util.loc[switch_loc, 'util'] = df_util.loc[switch_loc].apply(compute_switch_phase_df, axis='columns') + + return df_util['util'] + +def simulate_pelt_clock(capacity, clock, scale=PELT_SCALE): + """ + Simulate a PELT clock of an entity from the capacities of the CPU it's + residing on. + :param capacity: CPU capacity over time. + :type capacity: pandas.Series + :param clock: A series of timestamps at which the clock is to be observed. + The returned :class:`pandas.Series` will provide the simulated clock + values at these instants. + :type clock: pandas.Series + :param scale: Maximum value allowed for CPU capacity. + :type scale: float + """ + # Ensures the clock's index is the same as the clock + clock = clock.copy(deep=False) + clock.index = clock + df = pd.DataFrame( + dict( + clock=clock, + capacity=capacity, + ), + ) + # Remember which row is part of the user-provided clock + df['orig_clock'] = ~df['clock'].isna() + # Needed for "time" interpolation + df.index = pd.TimedeltaIndex(df.index, unit='s') + # Shift so that the capacity is aligned with the corresponding delta + df['capacity'] = df['capacity'].fillna(method='ffill').shift() + # Time flows linearly between 2 samples of the clock + df['clock'].interpolate(method='time', inplace=True) + # If there is an initial NaN in the clock or capacity, remove it since + # interpolate() cannot cope with that correctly even with + # limit_direction='both' + df.dropna(inplace=True) + df['delta'] = df['clock'].diff() + # Scale each delta independantly + df['delta'] *= df['capacity'] / scale + # Fill the NaN with the initial value for the cumsum() fold + df['delta'].iat[0] = df['clock'].iat[0] + # Reverse df_add_delta() now that we scaled each delta + df['new_clock'] = df['delta'].cumsum() + # Back to Float64Index + df.index = df.index.total_seconds() + # Filter-out all the rows that were introduced by the capacity changes but + # are not part of the clock requested by the user + df = df[df['orig_clock'] == True] + return df['new_clock'] + # vim :set tabstop=4 shiftwidth=4 textwidth=80 expandtab -- GitLab From b42cc5f1fa88fb4994314bd4bb6340a4ddb15f7b Mon Sep 17 00:00:00 2001 From: douglas-raillard-arm Date: Tue, 29 Sep 2020 15:57:26 +0100 Subject: [PATCH 4/8] WIP --- lisa/pelt.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/lisa/pelt.py b/lisa/pelt.py index 837b7d3ac..f8c45d7c5 100644 --- a/lisa/pelt.py +++ b/lisa/pelt.py @@ -277,15 +277,15 @@ def pelt_interpolate(util, clock, interpolate_at=None): :param clock: A series of timestamps providing the simulated PELT clock. :type clock: pandas.Series - :param interpolate_at: A series of additional timestamps for which the - CPU utilization has to be calculated. It can be omitted in case - util already contains those extra timestamps. + :param interpolate_at: A series of additional timestamps for which the CPU + utilization has to be calculated. It can be omitted in case util + already contains those extra timestamps. :type interpolate_at: pandas.Series """ if interpolate_at is not None: util = series_extend_index(util, interpolate_at) - df_util = util.to_frame() + df_util = pd.DataFrame(dict(util=util)) df_util = df_util.assign(new_index=clock.values) df_util = df_util.set_index('new_index') @@ -306,9 +306,12 @@ def pelt_interpolate(util, clock, interpolate_at=None): prev_util = row.prev_util window_shrink = 1e3 activations = pd.Series([1, 1], index=[last_update, timestamp]) - simulated_phase_df = simulate_pelt(activations, init=prev_util, - window=PELT_WINDOW/window_shrink, - half_life=PELT_HALF_LIFE*window_shrink) + simulated_phase_df = simulate_pelt( + activations, + init=prev_util, + window=PELT_WINDOW / window_shrink, + half_life=PELT_HALF_LIFE * window_shrink + ) return simulated_phase_df.iloc[-1] switch_loc = df_util['util'].isnull() @@ -320,12 +323,15 @@ def simulate_pelt_clock(capacity, clock, scale=PELT_SCALE): """ Simulate a PELT clock of an entity from the capacities of the CPU it's residing on. + :param capacity: CPU capacity over time. :type capacity: pandas.Series + :param clock: A series of timestamps at which the clock is to be observed. The returned :class:`pandas.Series` will provide the simulated clock values at these instants. :type clock: pandas.Series + :param scale: Maximum value allowed for CPU capacity. :type scale: float """ -- GitLab From bf6ed587615c8eeee2edc50a5750c74966e3f8a9 Mon Sep 17 00:00:00 2001 From: Dietmar Eggemann Date: Fri, 12 Jun 2020 16:06:31 +0200 Subject: [PATCH 5/8] lisa.tests.scheduler.load_tracking: Rename local variable task to migr_task Preparatory change to allow the use of local variable task to iterate over test rt-app tasks. migr_task is lisa.trace.TaskID (.pid, .comm) for rtapp profile task name 'migr'. Signed-off-by: Dietmar Eggemann --- lisa/tests/scheduler/load_tracking.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lisa/tests/scheduler/load_tracking.py b/lisa/tests/scheduler/load_tracking.py index 6f84e65ae..97d26e5a3 100644 --- a/lisa/tests/scheduler/load_tracking.py +++ b/lisa/tests/scheduler/load_tracking.py @@ -957,11 +957,11 @@ class CPUMigrationBase(LoadTrackingBase): """ df = self.trace.analysis.load_tracking.df_cpus_signal('util') tasks = self.rtapp_task_ids_map.keys() - task = sorted(task for task in tasks if task.startswith('migr'))[0] - task = self.rtapp_task_ids_map[task][0] + migr_task = sorted(task for task in tasks if task.startswith('migr'))[0] + migr_task = self.rtapp_task_ids_map[migr_task][0] cpu_util = {} - for row in self.trace.analysis.rta.df_phases(task).itertuples(): + for row in self.trace.analysis.rta.df_phases(migr_task).itertuples(): phase = row.phase duration = row.duration start = row.Index -- GitLab From 37255abb929172c5975431f480039f90a1e4bf79 Mon Sep 17 00:00:00 2001 From: Dietmar Eggemann Date: Fri, 12 Jun 2020 16:09:43 +0200 Subject: [PATCH 6/8] lisa.tests.scheduler.load_tracking: Fix OneTaskCPUMigration:test_util_task_migration Running tests based on trace_pelt_cfs_tp(root cfs_rq) has the problem that these trace events might not be issued when a test task gets scheduled out for the idle task. PELT only issues a trace event every 1024us ('rq_clock_task time'). So the trace_pelt_cfs_tp() during the schedule out can be suppressed when the delta is < 1024us. This effect is dramatically made worse on a little CPU due to PELT time scaling. The 'rq_clock_pelt time' is used here which scales 'rq_clock_task time' by CPU capacity and frequency. With a little CPU's capacity being ~ 1/3 of that of the big CPU (1024) this delta can be up to 3ms! Missing lots of maximum trace_pelt_cfs_tp(root cfs_rq) values in the trace leads to the fact that kernel_util_mean() [lisa/pelt.py], which is used in CPUMigrationBase::get_trace_cpu_util(), computes a too small CPU utilization value. OneTaskCPUMigration.test_util_task_migration() checks that the difference between the CPU utilization values for each CPU and phase of CPUMigrationBase.get_expected_cpu_util() and CPUMigrationBase.get_trace_cpu_util() is <= 3%. CPUMigrationBase.get_expected_cpu_util() is based on task duty cycle. So it is important that CPUMigrationBase.get_trace_cpu_util() has the trace_pelt_cfs_tp(root cfs_rq) with the maximum CPU utilization values available and those happen when a test task gets scheduled out for the idle task. To fix the issue inject fake CPU utilization events when sched_switch events occur in which a rtapp task switches out for the idle task (df_state[task]). The CPU utilization for those events is calculated in pelt_interpolate() using simulate_pelt_clock() to simulate the time scaling PELT time 'rq_clock_pelt time'. Signed-off-by: Dietmar Eggemann --- lisa/tests/scheduler/load_tracking.py | 46 ++++++++++++++++++++++++--- 1 file changed, 42 insertions(+), 4 deletions(-) diff --git a/lisa/tests/scheduler/load_tracking.py b/lisa/tests/scheduler/load_tracking.py index 97d26e5a3..6f7053e3b 100644 --- a/lisa/tests/scheduler/load_tracking.py +++ b/lisa/tests/scheduler/load_tracking.py @@ -29,14 +29,14 @@ from lisa.tests.base import ( ) from lisa.target import Target from lisa.utils import ArtifactPath, groupby, ExekallTaggable -from lisa.datautils import series_mean, df_window, df_filter_task_ids, series_refit_index, df_split_signals, df_refit_index +from lisa.datautils import series_mean, df_window, df_filter_task_ids, series_refit_index, df_split_signals, df_refit_index, df_extend_index from lisa.wlgen.rta import RTA, Periodic, RTATask from lisa.trace import FtraceCollector, requires_events, may_use_events, MissingTraceEventError from lisa.analysis.load_tracking import LoadTrackingAnalysis from lisa.analysis.tasks import TasksAnalysis from lisa.analysis.rta import RTAEventsAnalysis from lisa.analysis.frequency import FrequencyAnalysis -from lisa.pelt import PELT_SCALE, simulate_pelt, pelt_settling_time, kernel_util_mean +from lisa.pelt import PELT_SCALE, simulate_pelt, pelt_settling_time, kernel_util_mean, pelt_interpolate, simulate_pelt_clock UTIL_SCALE = PELT_SCALE @@ -906,7 +906,6 @@ class CPUMigrationBase(LoadTrackingBase): phase_df = df_window(df, window, clip_window=True) for cpu in self.cpus: - if cpus_rel_freq is None: rel_freq_mean = 1 else: @@ -960,9 +959,35 @@ class CPUMigrationBase(LoadTrackingBase): migr_task = sorted(task for task in tasks if task.startswith('migr'))[0] migr_task = self.rtapp_task_ids_map[migr_task][0] + df_state = {} + + for task in self.rtapp_task_ids_map.keys(): + comm = self.rtapp_task_ids_map[task][0].comm + df_state[task] = self.trace.analysis.tasks.df_task_activation(comm) + df_state[task] = df_state[task][df_state[task]['active'] == 0] + + cpu_capacities = self.plat_info['cpu-capacities']['rtapp'] cpu_util = {} + cpu_freqs = self.plat_info['freqs'] + + try: + freq_df = self.trace.analysis.frequency.df_cpus_frequency() + except MissingTraceEventError: + raise RuntimeError('cpus_rel_freq = None') + else: + cpus_rel_freq = { + # Frequency, normalized according to max frequency on that CPU + cols['cpu']: df['frequency'] / max(cpu_freqs[cols['cpu']]) + for cols, df in df_split_signals(freq_df, ['cpu']) + } + for row in self.trace.analysis.rta.df_phases(migr_task).itertuples(): phase = row.phase + + # TODO: remove that once we have named phases to skip the buffer phase. + if phase == 0: + continue + duration = row.duration start = row.Index end = start + duration @@ -973,7 +998,20 @@ class CPUMigrationBase(LoadTrackingBase): phase_df = df_window(df, (start, end), method='pre', clip_window=True) for cpu in self.cpus: - util = phase_df[phase_df['cpu'] == cpu]['util'] + + cpu_phase_df = phase_df[phase_df['cpu'] == cpu].copy() + + df_state_clipped = pd.concat(df_window(df_state[task], (start, end), method='pre', clip_window=True) + for task, wlgen_task in self.rtapp_profile.items() + if cpu in wlgen_task.phases[phase-1].cpus) + + df_state_clipped = df_state_clipped.sort_index() + df_util = df_extend_index(cpu_phase_df, df_state_clipped[[]]) + + cpu_capacity = cpu_capacities[cpu] * cpus_rel_freq[cpu] + clock = simulate_pelt_clock(cpu_capacity, df_util.index.to_series()) + + util = pelt_interpolate(df_util['util'], clock) cpu_util.setdefault(cpu, {})[phase] = kernel_util_mean(util, plat_info=self.plat_info) return cpu_util -- GitLab From 3b4bb994eea80156d9dd4714286c5e1805b06442 Mon Sep 17 00:00:00 2001 From: douglas-raillard-arm Date: Tue, 29 Sep 2020 17:23:21 +0100 Subject: [PATCH 7/8] WIP --- lisa/tests/scheduler/load_tracking.py | 51 +++++++++++++-------------- 1 file changed, 24 insertions(+), 27 deletions(-) diff --git a/lisa/tests/scheduler/load_tracking.py b/lisa/tests/scheduler/load_tracking.py index 6f7053e3b..1ce20d4e9 100644 --- a/lisa/tests/scheduler/load_tracking.py +++ b/lisa/tests/scheduler/load_tracking.py @@ -959,27 +959,26 @@ class CPUMigrationBase(LoadTrackingBase): migr_task = sorted(task for task in tasks if task.startswith('migr'))[0] migr_task = self.rtapp_task_ids_map[migr_task][0] - df_state = {} - - for task in self.rtapp_task_ids_map.keys(): - comm = self.rtapp_task_ids_map[task][0].comm - df_state[task] = self.trace.analysis.tasks.df_task_activation(comm) - df_state[task] = df_state[task][df_state[task]['active'] == 0] + def get_inactive(tasks): + task, = tasks + activations = self.trace.analysis.tasks.df_task_activation(task) + return activations[activations['active'] == 0] + + df_state = { + name: get_inactive(tasks) + for name, tasks in self.rtapp_task_ids_map.items() + } cpu_capacities = self.plat_info['cpu-capacities']['rtapp'] cpu_util = {} cpu_freqs = self.plat_info['freqs'] - try: - freq_df = self.trace.analysis.frequency.df_cpus_frequency() - except MissingTraceEventError: - raise RuntimeError('cpus_rel_freq = None') - else: - cpus_rel_freq = { - # Frequency, normalized according to max frequency on that CPU - cols['cpu']: df['frequency'] / max(cpu_freqs[cols['cpu']]) - for cols, df in df_split_signals(freq_df, ['cpu']) - } + freq_df = self.trace.analysis.frequency.df_cpus_frequency() + cpus_rel_freq = { + # Frequency, normalized according to max frequency on that CPU + cols['cpu']: df['frequency'] / max(cpu_freqs[cols['cpu']]) + for cols, df in df_split_signals(freq_df, ['cpu']) + } for row in self.trace.analysis.rta.df_phases(migr_task).itertuples(): phase = row.phase @@ -998,20 +997,18 @@ class CPUMigrationBase(LoadTrackingBase): phase_df = df_window(df, (start, end), method='pre', clip_window=True) for cpu in self.cpus: - - cpu_phase_df = phase_df[phase_df['cpu'] == cpu].copy() - - df_state_clipped = pd.concat(df_window(df_state[task], (start, end), method='pre', clip_window=True) - for task, wlgen_task in self.rtapp_profile.items() - if cpu in wlgen_task.phases[phase-1].cpus) - - df_state_clipped = df_state_clipped.sort_index() - df_util = df_extend_index(cpu_phase_df, df_state_clipped[[]]) + cpu_phase_df = phase_df[phase_df['cpu'] == cpu] + df_state_clipped = pd.concat( + df_window(df_state[task], (start, end), method='pre', clip_window=True) + for task, wlgen_task in self.rtapp_profile.items() + if cpu in wlgen_task.phases[phase].cpus + ).sort_index() + df_util = df_extend_index(cpu_phase_df, df_state_clipped.index) cpu_capacity = cpu_capacities[cpu] * cpus_rel_freq[cpu] - clock = simulate_pelt_clock(cpu_capacity, df_util.index.to_series()) + clock = simulate_pelt_clock(cpu_capacity, clock=df_util.index.to_series()) - util = pelt_interpolate(df_util['util'], clock) + util = pelt_interpolate(df_util['util'], clock=clock) cpu_util.setdefault(cpu, {})[phase] = kernel_util_mean(util, plat_info=self.plat_info) return cpu_util -- GitLab From fd3039caf83940ea93ec1c8df7d67ef48ea1b819 Mon Sep 17 00:00:00 2001 From: Dietmar Eggemann Date: Mon, 29 Jun 2020 10:44:38 +0200 Subject: [PATCH 8/8] lisa.tests.scheduler.load_tracking: Fix remaining issue for OneTaskCPUMigration:test_util_task_migration Running commit "lisa.tests.scheduler.load_tracking: Fix OneTaskCPUMigration:test_util_task_migration" still showed some tiny underestimation of 0% - ~-0.25% above the allowed -3% (e.g. in phase 1 of CPU3 on juno-r0 and juno-r2). Turns out that CPUMigrationBase.get_trace_cpu_util() advances 'start' of the window by 25% and CPUMigrationBase.get_expected_cpu_util() does not. Align both functions by removing this 25% advancing of start in CPUMigrationBase.get_trace_cpu_util(). The number of the error cases went down by factor ~4. Signed-off-by: Dietmar Eggemann --- lisa/tests/scheduler/load_tracking.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lisa/tests/scheduler/load_tracking.py b/lisa/tests/scheduler/load_tracking.py index 1ce20d4e9..70b01d374 100644 --- a/lisa/tests/scheduler/load_tracking.py +++ b/lisa/tests/scheduler/load_tracking.py @@ -993,7 +993,7 @@ class CPUMigrationBase(LoadTrackingBase): # Ignore the first quarter of the util signal of each phase, since # it's impacted by the phase change, and util can be affected # (rtapp does some bookkeeping at the beginning of phases) - start += duration / 4 + # start += duration / 4 phase_df = df_window(df, (start, end), method='pre', clip_window=True) for cpu in self.cpus: -- GitLab