diff --git a/libs/devlib b/libs/devlib index fe0d6eda2acb7345e0499943e8effac187aac91c..8aa9d672a1ec15e3f1a516a8dda489f42badba0e 160000 --- a/libs/devlib +++ b/libs/devlib @@ -1 +1 @@ -Subproject commit fe0d6eda2acb7345e0499943e8effac187aac91c +Subproject commit 8aa9d672a1ec15e3f1a516a8dda489f42badba0e diff --git a/libs/utils/env.py b/libs/utils/env.py index 6cd8b6ce1f805b21749cd7c2e5d1d7dff759106a..d0d60b392326cb947874f90b763063dde092494b 100644 --- a/libs/utils/env.py +++ b/libs/utils/env.py @@ -458,7 +458,10 @@ class TestEnv(ShareState): # Setup board default if not specified by configuration self.nrg_model = None platform = None + + default_modules = ['sched'] self.__modules = ['cpufreq', 'cpuidle'] + if 'board' not in self.conf: self.conf['board'] = 'UNKNOWN' @@ -522,7 +525,7 @@ class TestEnv(ShareState): # Modules configuration ######################################################################## - modules = set(self.__modules) + modules = set(self.__modules + default_modules) # Refine modules list based on target.conf modules.update(self.conf.get('modules', [])) diff --git a/libs/utils/trace.py b/libs/utils/trace.py index b099f958e9826f4fd1d60553b60bb605506fa95b..43e1b6e1548620ff1ee9b5802fbed1e4834f2391 100644 --- a/libs/utils/trace.py +++ b/libs/utils/trace.py @@ -649,17 +649,11 @@ class Trace(object): """ Add a column with overutilized status duration. """ if not self.hasEvents('sched_overutilized'): return - df = self._dfg_trace_event('sched_overutilized') - df['start'] = df.index - df['len'] = (df.start - df.start.shift()).fillna(0).shift(-1) - df.drop('start', axis=1, inplace=True) - # Fix the last event, which will have a NaN duration - # Set duration to trace_end - last_event - df.loc[df.index[-1], 'len'] = self.start_time + self.time_range - df.index[-1] + df = self._dfg_trace_event('sched_overutilized') + self.addEventsDeltas(df, 'len') # Build a stat on trace overutilization - df = self._dfg_trace_event('sched_overutilized') self.overutilized_time = df[df.overutilized == 1].len.sum() self.overutilized_prc = 100. * self.overutilized_time / self.time_range @@ -883,7 +877,6 @@ class Trace(object): # Fix sequences of wakeup/sleep events reported with the same index return handle_duplicate_index(cpu_active) - @memoized def getClusterActiveSignal(self, cluster): """ @@ -962,6 +955,127 @@ class Trace(object): np.where(freq['state'] == 1, freq['rate'], float('nan'))) return freq + def addEventsDeltas(self, df, col_name='delta'): + """ + Compute the time between each event in a dataframe, and store it in a + new column. This only really makes sense for events tracking an + on/off state (e.g. overutilized, idle) + """ + if df.empty: + return df + + if col_name in df.columns: + raise RuntimeError("Column {} is already present in the dataframe". + format(col_name)) + + df['start'] = df.index + df[col_name] = (df.start - df.start.shift()).fillna(0).shift(-1) + df.drop('start', axis=1, inplace=True) + + # Fix the last event, which will have a NaN duration + # Set duration to trace_end - last_event + df.loc[df.index[-1], col_name] = self.start_time + self.time_range - df.index[-1] + + @staticmethod + def squash_df(df, start, end, column='delta'): + """ + Slice a dataframe of deltas in [start:end] and ensure we have + an event at exactly those boundaries. + + The input dataframe is expected to have a "column" which reports + the time delta between consecutive rows, as for example dataframes + generated by addEventsDeltas(). + + The returned dataframe is granted to have an initial and final + event at the specified "start" ("end") index values, which values + are the same of the last event before (first event after) the + specified "start" ("end") time. + + Examples: + + Slice a dataframe to [start:end], and work on the time data so that it + makes sense within the interval. + + Examples to make it clearer: + + df is: + Time len state + 15 1 1 + 16 1 0 + 17 1 1 + 18 1 0 + ------------- + + slice_df(df, 16.5, 17.5) => + + Time len state + 16.5 .5 0 + 17 .5 1 + + slice_df(df, 16.2, 16.8) => + + Time len state + 16.2 .6 0 + + :returns: a new df that fits the above description + """ + if df.empty: + return df + + end = min(end, df.index[-1] + df[column].values[-1]) + res_df = pd.DataFrame(data=[], columns=df.columns) + + if start > end: + return res_df + + # There's a few things to keep in mind here, and it gets confusing + # even for the people who wrote the code. Let's write it down. + # + # It's assumed that the data is continuous, i.e. for any row 'r' within + # the trace interval, we will find a new row at (r.index + r.len) + # For us this means we'll never end up with an empty dataframe + # (if we started with a non empty one) + # + # What's we're manipulating looks like this: + # (| = events; [ & ] = start,end slice) + # + # | [ | ] | + # e0 s0 e1 s1 e2 + # + # We need to push e0 within the interval, and then tweak its duration + # (len column). The mathemagical incantation for that is: + # e0.len = min(e1.index - s0, s1 - s0) + # + # This takes care of the case where s1 isn't in the interval + # If s1 is in the interval, we just need to cap its len to + # s1 - e1.index + + prev_df = df[:start] + middle_df = df[start:end] + + # Tweak the closest previous event to include it in the slice + if not prev_df.empty and not (start in middle_df.index): + res_df = res_df.append(prev_df.tail(1)) + res_df.index = [start] + e1 = end + + if not middle_df.empty: + e1 = middle_df.index[0] + + res_df[column] = min(e1 - start, end - start) + + if not middle_df.empty: + res_df = res_df.append(middle_df) + if end in res_df.index: + # e_last and s1 collide, ditch e_last + res_df = res_df.drop([end]) + else: + # Fix the delta for the last row + delta = min(end - res_df.index[-1], res_df[column].values[-1]) + res_df.at[res_df.index[-1], column] = delta + + return res_df + class TraceData: """ A DataFrame collector exposed to Trace's clients """ pass diff --git a/tests/eas/misfit.py b/tests/eas/misfit.py new file mode 100644 index 0000000000000000000000000000000000000000..90a674289aa2ddcacf51584463abe65d6e916621 --- /dev/null +++ b/tests/eas/misfit.py @@ -0,0 +1,432 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# Copyright (C) 2018, Arm Limited and contributors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import json +import numpy as np +import pandas as pd + +from collections import OrderedDict +from copy import deepcopy +from unittest import SkipTest + +from test import LisaTest, experiment_test +from trace import Trace +from executor import Executor +from wlgen.rta import Periodic, RTA + +from devlib.utils.misc import memoized +from devlib.module.sched import SchedDomain + +WORKLOAD_PERIOD_MS = 16 + +SD_ASYM_CPUCAPACITY = 0x0040 + +class _MisfitMigrationBase(LisaTest): + """ + Base class for shared functionality of misfit migration tests + """ + + test_conf = { + "ftrace" : { + "events" : [ + "sched_switch", + "sched_wakeup", + "sched_wakeup_new", + "cpu_idle", + ], + "buffsize" : 100 * 1024 + }, + "modules": ["cgroups", "cpufreq"], + } + + @classmethod + def setUpClass(cls, *args, **kwargs): + super(_MisfitMigrationBase, cls).runExperiments(*args, **kwargs) + + @memoized + @staticmethod + def _has_asym_cpucapacity(test_env): + # Just try to find at least one instance of that flag + sd_info = test_env.target.sched.get_sd_info() + + for cpu, domain_node in sd_info.cpus.items(): + for domain in domain_node.domains.values(): + if domain.has_flags(SchedDomain.SD_ASYM_CPUCAPACITY): + return True + + return False + + @memoized + @staticmethod + def _classify_cpus(test_env): + """ + Classify cpus per capacity. + + :returns: A list of list. CPUs of equal capacities are packed in the + same list, and those lists of CPUs are ordered by capacity values. + """ + cpus = {} + for cpu in xrange(test_env.target.number_of_cpus): + cap = test_env.target.sched.get_capacity(cpu) + if cap not in cpus: + cpus[cap] = [] + + cpus[cap].append(cpu) + + capacities = sorted(cpus.keys()) + return [cpus[capacity] for capacity in capacities] + + @memoized + @staticmethod + def _get_max_lb_interval(test_env): + """ + Get the value of maximum_load_balance_interval. + + The kernel computes it so: + HZ*num_online_cpus()/10; + (https://elixir.bootlin.com/linux/v4.15/source/kernel/sched/fair.c#L9101) + + Here we don't do any hotplugging so we consider all CPUs to be online. + + :returns: The absolute maximum load-balance interval in seconds + """ + HZ = test_env.target.sched.get_hz() + return ((HZ * test_env.target.number_of_cpus) / 10) * (1. / HZ) + + @classmethod + def _get_wload(cls, test_env): + raise NotImplementedError() + + @classmethod + def _getExperimentsConf(cls, test_env): + if not cls._has_asym_cpucapacity(test_env): + raise SkipTest( + 'This test requires a target with asymetric CPU capacities. ' + 'SD_ASYM_CPUCAPACITY was not found.' + ) + + conf = { + 'tag' : 'misfit', + 'flags' : ['ftrace', 'freeze_userspace'], + } + + if 'cpufreq' in test_env.target.modules: + available_govs = test_env.target.cpufreq.list_governors(0) + conf['cpufreq'] = {'governor' : 'performance'} + + return { + 'wloads' : cls._get_wload(test_env), + 'confs' : [conf], + } + +class StaggeredFinishes(_MisfitMigrationBase): + """ + Test Misfit task migration happens at idle balance (staggered test case) + + This test spawns nr_cpus 100% tasks. The tasks running on bigger-capacity + CPUs will finish first, and it is expected of them to instantly pull the + tasks running on smaller-capacity CPUs via idle-balance. + """ + + # How long the tasks will be pinned to their "starting" CPU. Doesn't have + # to be long (we just have to ensure they spawn there), so arbitrary value + pin_delay_s = 0.001 + + # Somewhat arbitrary delay - long enough to ensure + # rq->avg_idle > sysctl_sched_migration_cost + idling_delay_s = 1 + + # How long do we allow the bigs to be idle when there are tasks running on + # the LITTLEs + allowed_idle_time_s = 0.001 + + # How much % of time do we allow the tasks to be preempted, out of the + # total test duration + allowed_preempt_pct = 1 + + @classmethod + def _get_wload(cls, test_env): + cpus = range(test_env.platform['cpus_count']) + + # We're pinning stuff in the first phase, so give it ample time to + # clean the pinned logic out of balance_interval + free_time_s = 1.1 * cls._get_max_lb_interval(test_env) + stagger_s = free_time_s / (10 * len(cpus)) + + params = {} + + for cpu in cpus: + params["misfit_{}".format(cpu)] = ( + Periodic( + duty_cycle_pct=100, + duration_s=cls.pin_delay_s, + delay_s=cls.idling_delay_s, + period_ms=16, + cpus=[cpu] + ) + Periodic( + duty_cycle_pct=100, + # Introduce staggered task completions + duration_s=free_time_s + cpu * stagger_s, + period_ms=16, + cpus=cpus + ) + ).get() + + wload = RTA(test_env.target, 'tmp', + calibration=test_env.calibration()) + wload.conf(kind='profile', params=params, + run_dir=Executor.get_run_dir(test_env.target)) + + return { + 'staggered' : { + 'type' : 'rt-app', + 'conf' : { + 'class' : 'custom', + 'json' : wload.json + } + } + } + + @memoized + def get_active_df(self, trace, cpu): + """ + :returns: A dataframe that describes the idle status (on/off) of 'cpu' + """ + active_df = pd.DataFrame(trace.getCPUActiveSignal(cpu), columns=['state']) + trace.addEventsDeltas(active_df) + + return active_df + + def max_idle_time(self, trace, start, end, cpus): + """ + :returns: The maximum idle time of 'cpus' in the [start, end] interval + """ + idle_df = pd.DataFrame() + max_time = 0 + max_cpu = 0 + + for cpu in cpus: + busy_df = self.get_active_df(trace, cpu) + busy_df = Trace.squash_df(busy_df, start, end) + busy_df = busy_df[busy_df.state == 0] + + if busy_df.empty: + continue + + local_max = busy_df.delta.max() + if local_max > max_time: + max_time = local_max + max_cpu = cpu + + return max_time, max_cpu + + @memoized + def start_time(self, experiment): + """ + :returns: The start time of the test workload, IOW the time at which + all tasks are up and running on their designated CPUs. + """ + trace = self.get_trace(experiment) + sdf = trace.data_frame.trace_event('sched_switch') + # Get the time where the first rt-app task spawns + init_start = sdf[sdf.next_comm.str.contains('misfit')].index[0] + + # The tasks don't wake up at the same exact time, find the task that is + # the last to wake up. + last_start = 0 + + sdf = sdf[init_start + self.idling_delay_s * 0.9 :] + + for cpu in range(self.te.target.number_of_cpus): + task_name = "misfit_{}".format(cpu) + task_start = sdf[(sdf.next_comm == task_name) & (sdf["__cpu"] == cpu)].index[0] + last_start = max(last_start, task_start) + + return last_start + + def trim_lat_df(self, start, lat_df): + if lat_df.empty: + return lat_df + + lat_df = Trace.squash_df(lat_df, start, lat_df.index[-1], "t_delta") + # squash_df only updates t_delta, remove t_start to make sure it's not used + return lat_df.drop('t_start', 1) + + @experiment_test + def test_preempt_time(self, experiment, tasks): + """ + Test that tasks are not being preempted too much + """ + trace = self.get_trace(experiment) + + cpus = range(self.te.target.number_of_cpus) + sorted_cpus = self._classify_cpus(self.te) + + sdf = trace.data_frame.trace_event('sched_switch') + latency_dfs = { + i : trace.data_frame.latency_df('misfit_{}'.format(i)) + for i in cpus + } + + start_time = self.start_time(experiment) + end_time = sdf[sdf.prev_comm.str.contains('misfit')].index[-1] + test_duration = end_time - start_time + + for task_num in cpus: + task_name = "misfit_{}".format(task_num) + lat_df = latency_dfs[task_num] + + # The sched_switch dataframe where the misfit task + # is replaced by another misfit task + preempt_sdf = sdf[ + (sdf.prev_comm == task_name) & + (sdf.next_comm.str.startswith("misfit_")) + ] + + lat_df = self.trim_lat_df( + start_time, + lat_df[ + (lat_df.index.isin(preempt_sdf.index)) & + # Ensure this is a preemption and not just the task ending + (lat_df.curr_state == "S") + ] + ) + + task_name = "misfit_{}".format(task_num) + preempt_time = lat_df.t_delta.sum() + + preempt_pct = (preempt_time / test_duration) * 100 + self._log.debug("{} was preempted {:.2f}% of the time".format(task_name, preempt_pct)) + + if preempt_time > test_duration * self.allowed_preempt_pct/100.: + err = "{} was preempted for {:.2f}% ({:.2f}s) of the test duration, " \ + "expected < {}%".format( + task_name, + preempt_pct, + preempt_time, + self.allowed_preempt_pct + ) + raise AssertionError(err) + + def _test_idle_time(self, trace, latency_dfs, busy_cpus): + """ + Test that for every event in latency_dfs, busy_cpus are + not idle for more than self.allowed_idle_time_s + + :param trace: The trace to process + :type trace: :class:`Trace`: + + :param latency_dfs: The latency dataframes (see :class:`analysis.LatencyAnalysis`), + arranged in a {task_name : latency_df} shape + :type latency_dfs: dict + + :param busy_cpus: The CPUs we want to assert are kept busy + :type busy_cpus: list + """ + cpus = range(self.te.target.number_of_cpus) + sdf = trace.data_frame.trace_event('sched_switch') + + for task_name, lat_df in latency_dfs.iteritems(): + # Have a look at every task activation + for index, row in lat_df.iterrows(): + cpu = int(row["__cpu"]) + end = index + row.t_delta + # Ensure 'busy_cpus' are not idle for too long + idle_time, other_cpu = self.max_idle_time(trace, index, end, busy_cpus) + + if idle_time > self.allowed_idle_time_s: + err = "{} was on CPU{} @{:.3f} but CPU{} was idle " \ + "for {:.3f}s, expected < {}s".format( + task_name, + cpu, + index + trace.ftrace.basetime, + other_cpu, + idle_time, + self.allowed_idle_time_s + ) + raise AssertionError(err) + + @experiment_test + def test_migration_delay(self, experiment, tasks): + """ + Test that big CPUs pull tasks ASAP + """ + + trace = self.get_trace(experiment) + cpus = range(self.te.target.number_of_cpus) + sorted_cpus = self._classify_cpus(self.te) + + littles = sorted_cpus[0] + bigs = [] + for group in sorted_cpus[1:]: + bigs += group + + start_time = self.start_time(experiment) + + latency_dfs = {} + for i in cpus: + # This test is about the first migration delay. + # Trim the latency_df to up until the first time the task + # runs on a big CPU. The test will fail if the task wasn't + # migrated ASAP + res = pd.DataFrame([]) + task_name = 'misfit_{}'.format(i) + + df = trace.data_frame.latency_df(task_name) + df = self.trim_lat_df(start_time, df[df.curr_state == "A"]) + + first_big = df[df["__cpu"].isin(bigs)] + + if not first_big.empty: + res = df[df["__cpu"].isin(littles)][:first_big.index[0]] + + latency_dfs[task_name] = res + + self._test_idle_time(trace, latency_dfs, bigs) + + @experiment_test + def test_throughput(self, experiment, tasks): + """ + Test that big CPUs are kept as busy as possible + """ + trace = self.get_trace(experiment) + cpus = range(self.te.target.number_of_cpus) + sorted_cpus = self._classify_cpus(self.te) + + littles = sorted_cpus[0] + bigs = [] + for group in sorted_cpus[1:]: + bigs += group + + start_time = self.start_time(experiment) + + latency_dfs = {} + for i in cpus: + # This test is all about throughput: check that every time a task + # runs on a little it's because bigs are busy + task_name = 'misfit_{}'.format(i) + + df = trace.data_frame.latency_df(task_name) + latency_dfs[task_name] = self.trim_lat_df( + start_time, + df[ + (df.curr_state == "A") & + (df["__cpu"].isin(littles)) + ]) + + self._test_idle_time(trace, latency_dfs, bigs) diff --git a/tests/lisa/test_trace.py b/tests/lisa/test_trace.py index 29c1eaad3ce67d39ffe3157b5d40781cc800e258..473e9be73abb312bb64d9e62a79a398cce374018 100644 --- a/tests/lisa/test_trace.py +++ b/tests/lisa/test_trace.py @@ -21,6 +21,7 @@ import numpy as np from unittest import TestCase from trace import Trace +import pandas as pd class TestTrace(TestCase): """Smoke tests for LISA's Trace class""" @@ -34,6 +35,8 @@ class TestTrace(TestCase): 'sched_load_se' ] + FLOAT_PLACES=6 + def __init__(self, *args, **kwargs): super(TestTrace, self).__init__(*args, **kwargs) @@ -120,7 +123,8 @@ class TestTrace(TestCase): normalize_time=False ) - self.assertAlmostEqual(trace.time_range, expected_duration, places=6) + self.assertAlmostEqual(trace.time_range, expected_duration, + places=self.FLOAT_PLACES) def test_time_range_window(self): """ @@ -135,7 +139,72 @@ class TestTrace(TestCase): window=(76.402065, 80.402065) ) - self.assertAlmostEqual(trace.time_range, expected_duration, places=6) + self.assertAlmostEqual(trace.time_range, expected_duration, + places=self.FLOAT_PLACES) + + def test_squash_df(self): + """ + TestTrace: squash_df() behaves as expected + """ + index = [float(i) for i in range(15, 20)] + data = [(1, i % 2) for i in range(15, 20)] + df = pd.DataFrame(index=index, data=data, columns=['delta', 'state']) + + ## Test "standard" slice: + + # The df here should be: + # Time delta state + # 16.5 .5 0 + # 17 .5 1 + df1 = Trace.squash_df(df, 16.5, 17.5,) + head = df1.head(1) + tail = df1.tail(1) + self.assertEquals(len(df1.index), 2) + self.assertEquals(df1.index.tolist(), [16.5, 17]) + self.assertAlmostEqual(head['delta'].values[0], 0.5, places=self.FLOAT_PLACES) + self.assertAlmostEqual(tail['delta'].values[0], 0.5, places=self.FLOAT_PLACES) + self.assertEquals(head['state'].values[0], 0) + self.assertEquals(tail['state'].values[0], 1) + + ## Test slice where no event exists in the interval + + # The df here should be: + # Time delta state + # 16.2 .6 0 + df2 = Trace.squash_df(df, 16.2, 16.8) + self.assertEquals(len(df2.index), 1) + self.assertEquals(df2.index[0], 16.2) + self.assertAlmostEqual(df2['delta'].values[0], 0.6, places=self.FLOAT_PLACES) + self.assertEquals(df2['state'].values[0], 0) + + ## Test slice that matches an event's index + + # The df here should be: + # Time delta state + # 16 1 0 + df3 = Trace.squash_df(df, 16, 17) + self.assertEquals(len(df3.index), 1) + self.assertEquals(df3.index[0], 16) + self.assertAlmostEqual(df3['delta'].values[0], 1, places=self.FLOAT_PLACES) + self.assertEquals(df3['state'].values[0], 0) + + ## Test slice past last event + # The df here should be: + # Time delta state + # 19.5 .5 1 + df4 = Trace.squash_df(df, 19.5, 22) + self.assertEquals(len(df4.index), 1) + self.assertEquals(df4.index[0], 19.5) + self.assertAlmostEqual(df4['delta'].values[0], 0.5, places=self.FLOAT_PLACES) + self.assertEquals(df4['state'].values[0], 1) + + ## Test slice where there's no past event + df5 = Trace.squash_df(df, 10, 30) + self.assertEquals(len(df5.index), 5) + + ## Test slice where that should contain nothing + df6 = Trace.squash_df(df, 8, 9) + self.assertEquals(len(df6.index), 0) def test_overutilized_time(self): """ @@ -151,7 +220,8 @@ class TestTrace(TestCase): # Last event should be extended to the trace's end expected_time = (events[1] - events[0]) + (trace_end - events[2]) - self.assertAlmostEqual(self.trace.overutilized_time, expected_time, places=6) + self.assertAlmostEqual(self.trace.overutilized_time, expected_time, + places=self.FLOAT_PLACES) def test_plotCPUIdleStateResidency(self): """