diff --git a/lisa/analysis/load_tracking.py b/lisa/analysis/load_tracking.py index 4f0d6e8832e34695998a05ce533890ae7ca9deb3..6f4ad975438d3f0b3bb29addd034e1f132cdc0ae 100644 --- a/lisa/analysis/load_tracking.py +++ b/lisa/analysis/load_tracking.py @@ -54,7 +54,7 @@ class LoadTrackingAnalysis(AnalysisBase): The extra columns not shared between trace event versions """ if event in ['sched_load_cfs_rq', 'sched_load_se']: - return ['path', 'rbl_load', 'cpu'] + return ['path', 'rbl_load'] if event in ['sched_load_avg_task']: return ['load_sum', 'period_contrib', 'util_sum'] diff --git a/lisa/platforms/platinfo.py b/lisa/platforms/platinfo.py index f996a36eecb30487515f69d04f665fb5b22c5978..d079f6e9d35982dcedd5785e6226024d35bfecab 100644 --- a/lisa/platforms/platinfo.py +++ b/lisa/platforms/platinfo.py @@ -15,16 +15,27 @@ # limitations under the License. # -from collections.abc import Mapping - -from lisa.utils import HideExekallID, memoized -from lisa.conf import DeferredValue, IntIntDict, IntListList, IntIntListDict, StrIntListDict, MultiSrcConf, KeyDesc, LevelKeyDesc, TopLevelKeyDesc +from lisa.utils import HideExekallID, group_by_value +from lisa.conf import ( + DeferredValue, IntIntDict, IntListList, IntIntListDict, + MultiSrcConf, KeyDesc, LevelKeyDesc, TopLevelKeyDesc, DerivedKeyDesc +) from lisa.energy_model import EnergyModel from lisa.wlgen.rta import RTA from devlib.target import KernelVersion from devlib.exception import TargetStableError +def compute_capa_classes(conf): + """ + Derive the platform's capacity classes from the given conf + + This is intended for the creation of the ``capacity-classes`` key of + :class:`PlatformInfo`. + """ + return list(group_by_value(conf['cpu-capacities']).values()) + + class PlatformInfo(MultiSrcConf, HideExekallID): """ Platform-specific information made available to tests. @@ -40,7 +51,6 @@ class PlatformInfo(MultiSrcConf, HideExekallID): {generated_help} """ - # we could use mypy.subtypes.is_subtype and use the infrastructure provided # by typing module, but adding an external dependency is overkill for what # we need. @@ -55,8 +65,17 @@ class PlatformInfo(MultiSrcConf, HideExekallID): KeyDesc('os', 'OS being used, e.g. "linux"', [str]), KeyDesc('name', 'Free-form name of the board', [str]), KeyDesc('cpus-count', 'Number of CPUs', [int]), - KeyDesc('freq-domains', 'Frequency domains modeled by a list of CPU id for each domain', [IntListList]), + + KeyDesc('freq-domains', + 'Frequency domains modeled by a list of CPU IDs for each domain', + [IntListList]), KeyDesc('freqs', 'Dictionnary of CPU ID to list of frequencies', [IntIntListDict]), + + DerivedKeyDesc('capacity-classes', + 'Capacity classes modeled by a list of CPU IDs for each ' \ + 'capacity, sorted by capacity', + [IntListList], + [['cpu-capacities']], compute_capa_classes), )) """Some keys have a reserved meaning with an associated type.""" diff --git a/lisa/tests/kernel/scheduler/eas_behaviour.py b/lisa/tests/kernel/scheduler/eas_behaviour.py index e13304c58d4d97b4867ca614f6605379281431c1..ab0744d2a9283fcb75fe7e0f9c7352cfc494d9f3 100644 --- a/lisa/tests/kernel/scheduler/eas_behaviour.py +++ b/lisa/tests/kernel/scheduler/eas_behaviour.py @@ -98,28 +98,6 @@ class EASBehaviour(RTATestBundle, abc.ABC): """ return super().from_testenv(te, res_dir) - @classmethod - def min_cpu_capacity(cls, te): - """ - The smallest CPU capacity on the target - - :type te: lisa.env.TestEnv - - :returns: int - """ - return min(te.target.sched.get_capacities().values()) - - @classmethod - def max_cpu_capacity(cls, te): - """ - The highest CPU capacity on the target - - :type te: lisa.env.TestEnv - - :returns: int - """ - return max(te.target.sched.get_capacities().values()) - def _get_start_time(self): """ Get the time where the first task spawned @@ -409,22 +387,6 @@ class EASBehaviour(RTATestBundle, abc.ABC): return res - @classmethod - def unscaled_utilization(cls, capacity, utilization_pct): - """ - Convert a scaled utilization value to a 'raw', unscaled one. - - :param capacity: The capacity of the CPU ``utilization_pct``` is scaled - against - :type capacity: int - - :param utilization_pct: The scaled utilization in % - :type utilization_pct: int - """ - # TODO(?): use te.nrg_model.capacity_scale - return int((capacity / 1024) * utilization_pct) - -# TODO: factorize this crap out of these classes class OneSmallTask(EASBehaviour): """ A single 'small' task @@ -435,7 +397,7 @@ class OneSmallTask(EASBehaviour): @classmethod def get_rtapp_profile(cls, te): # 50% of the smallest CPU's capacity - duty = cls.unscaled_utilization(cls.min_cpu_capacity(te), 50) + duty = cls.unscaled_utilization(te, te.plat_info["capacity-classes"][0][0], 50) rtapp_profile = {} rtapp_profile[cls.task_name] = Periodic( @@ -470,7 +432,7 @@ class ThreeSmallTasks(EASBehaviour): @classmethod def get_rtapp_profile(cls, te): # 50% of the smallest CPU's capacity - duty = cls.unscaled_utilization(cls.min_cpu_capacity(te), 50) + duty = cls.unscaled_utilization(te, te.plat_info["capacity-classes"][0][0], 50) rtapp_profile = {} for i in range(3): @@ -492,7 +454,7 @@ class TwoBigTasks(EASBehaviour): @classmethod def get_rtapp_profile(cls, te): # 80% of the biggest CPU's capacity - duty = cls.unscaled_utilization(cls.max_cpu_capacity(te), 80) + duty = cls.unscaled_utilization(te, te.plat_info["capacity-classes"][-1][0], 80) rtapp_profile = {} for i in range(2): @@ -515,9 +477,11 @@ class TwoBigThreeSmall(EASBehaviour): @classmethod def get_rtapp_profile(cls, te): # 50% of the smallest CPU's capacity - small_duty = cls.unscaled_utilization(cls.min_cpu_capacity(te), 25) + small_duty = cls.unscaled_utilization( + te, te.plat_info["capacity-classes"][0][0], 50) # 80% of the biggest CPU's capacity - big_duty = cls.unscaled_utilization(cls.max_cpu_capacity(te), 80) + big_duty = cls.unscaled_utilization( + te, te.plat_info["capacity-classes"][-1][0], 80) rtapp_profile = {} @@ -549,12 +513,10 @@ class EnergyModelWakeMigration(EASBehaviour): @classmethod def get_rtapp_profile(cls, te): rtapp_profile = {} - capacities = te.target.sched.get_capacities() - bigs = [cpu for cpu, capacity in list(capacities.items()) - if capacity == cls.max_cpu_capacity(te)] + bigs = te.plat_info["capacity-classes"][-1] - start_pct = cls.unscaled_utilization(cls.min_cpu_capacity(te), 20) - end_pct = cls.unscaled_utilization(cls.max_cpu_capacity(te), 70) + start_pct = cls.unscaled_utilization(te, te.plat_info["capacity-classes"][0][0], 20) + end_pct = cls.unscaled_utilization(te, bigs[0], 70) for i in range(len(bigs)): rtapp_profile["{}_{}".format(cls.task_prefix, i)] = Step( @@ -589,8 +551,8 @@ class RampUp(EASBehaviour): @classmethod def get_rtapp_profile(cls, te): - start_pct = cls.unscaled_utilization(cls.min_cpu_capacity(te), 10) - end_pct = cls.unscaled_utilization(cls.max_cpu_capacity(te), 70) + start_pct = cls.unscaled_utilization(te, te.plat_info["capacity-classes"][0][0], 10) + end_pct = cls.unscaled_utilization(te, te.plat_info["capacity-classes"][-1][0], 70) rtapp_profile = { cls.task_name : Ramp( @@ -634,8 +596,8 @@ class RampDown(EASBehaviour): @classmethod def get_rtapp_profile(cls, te): - start_pct = cls.unscaled_utilization(cls.max_cpu_capacity(te), 70) - end_pct = cls.unscaled_utilization(cls.min_cpu_capacity(te), 10) + start_pct = cls.unscaled_utilization(te, te.plat_info["capacity-classes"][-1][0], 70) + end_pct = cls.unscaled_utilization(te, te.plat_info["capacity-classes"][0][0], 10) rtapp_profile = { cls.task_name : Ramp( diff --git a/lisa/tests/kernel/scheduler/load_tracking.py b/lisa/tests/kernel/scheduler/load_tracking.py index f3c48ad3245c021e03b493e83af2638469ade03b..6763ea47d40051c7b82687538e61c7a1d37719b9 100644 --- a/lisa/tests/kernel/scheduler/load_tracking.py +++ b/lisa/tests/kernel/scheduler/load_tracking.py @@ -15,13 +15,13 @@ # limitations under the License. # -import abc import os +from collections import OrderedDict + import matplotlib.pyplot as plt import pylab as pl -from collections import OrderedDict -from bart.common.Utils import select_window +from bart.common.Utils import select_window, area_under_curve from bart.sched import pelt from bart.sched.SchedAssert import SchedAssert @@ -33,7 +33,6 @@ from lisa.tests.kernel.test_bundle import ( from lisa.env import TestEnv from lisa.utils import ArtifactPath from lisa.wlgen.rta import Periodic -from lisa.trace import Trace UTIL_SCALE = 1024 """ @@ -203,7 +202,9 @@ class InvarianceBase(LoadTrackingBase): trace, cpu, task_name, capacity=None): exp_util = self.get_expected_util_avg(trace, cpu, task_name, capacity) signal_df = self.get_task_sched_signals(trace, cpu, task_name, [signal_name]) - signal_mean = signal_df[UTIL_AVG_CONVERGENCE_TIME_S:][signal_name].describe()['mean'] + signal = signal_df[UTIL_AVG_CONVERGENCE_TIME_S:][signal_name] + + signal_mean = area_under_curve(signal) / (signal.index[-1] - signal.index[0]) ok = self.is_almost_equal(exp_util, signal_mean, allowed_error_pct) diff --git a/lisa/tests/kernel/scheduler/misfit.py b/lisa/tests/kernel/scheduler/misfit.py index 6a05bfdbc175d6cd26ed9bf68492c343a6e35c10..8dd789890c9bff6804b88073719b4d9db706dce7 100644 --- a/lisa/tests/kernel/scheduler/misfit.py +++ b/lisa/tests/kernel/scheduler/misfit.py @@ -126,17 +126,7 @@ class StaggeredFinishes(MisfitMigrationBase): def __init__(self, res_dir, plat_info, rtapp_profile): super().__init__(res_dir, plat_info, rtapp_profile) - cpu_capacities = plat_info['cpu-capacities'] - - cpu_classes = {} - for cpu, capacity in cpu_capacities.items(): - if capacity not in cpu_classes.keys(): - cpu_classes[capacity] = [] - - cpu_classes[capacity].append(cpu) - - capacities = sorted(cpu_classes.keys()) - self.cpu_classes = [cpu_classes[capacity] for capacity in capacities] + cpu_classes = plat_info['capacity-classes'] sdf = self.trace.df_events('sched_switch') # Get the time where the first rt-app task spawns @@ -158,11 +148,11 @@ class StaggeredFinishes(MisfitMigrationBase): self.end_time = sdf[sdf.prev_comm.str.contains(self.task_prefix)].index[-1] self.duration = self.end_time - self.start_time - self.src_cpus = self.cpu_classes[0] + self.src_cpus = cpu_classes[0] # XXX: Might need to check the tasks can fit on all of those, rather # than just pick all but the smallest CPUs self.dst_cpus = [] - for group in self.cpu_classes[1:]: + for group in cpu_classes[1:]: self.dst_cpus += group @classmethod diff --git a/lisa/tests/kernel/staging/load_tracking.py b/lisa/tests/kernel/staging/load_tracking.py new file mode 100644 index 0000000000000000000000000000000000000000..d713abe1e93e4977d32861e8bd10741470848fb3 --- /dev/null +++ b/lisa/tests/kernel/staging/load_tracking.py @@ -0,0 +1,267 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# Copyright (C) 2019, Arm Limited and contributors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from bart.common.Utils import area_under_curve + +from lisa.wlgen.rta import RTATask, Periodic +from lisa.tests.kernel.test_bundle import TestMetric, ResultBundle, CannotCreateError +from lisa.tests.kernel.scheduler.load_tracking import ( + UTIL_AVG_CONVERGENCE_TIME_S, + UTIL_SCALE, + LoadTrackingBase +) + +class CPUMigrationBase(LoadTrackingBase): + """ + Base class for migration-related load tracking tests + + The idea here is to run several rt-app tasks and to have them pinned to + a single CPU for a single phase. They can change CPUs in a new phase, + and we can then inspect the CPU utilization - it should match the + sum of the utilization of all the tasks running on it. + + **Design notes:** + + Since we sum up the utilization of each task, make sure not to overload the + CPU - IOW, there should always be some idle cycles. + + The code assumes all tasks have the same number of phases, and that those + phases are all aligned. + """ + + PHASE_DURATION_S = 3 * UTIL_AVG_CONVERGENCE_TIME_S + """ + The duration of a single phase + """ + + @classmethod + def _run_rtapp(cls, te, res_dir, profile): + # Just do some validation on the profile + for name, task in profile.items(): + for phase in task.phases: + if len(phase.cpus) != 1: + raise RuntimeError("Each phase must be tied to a single CPU. " + "Task \"{}\" violates this".format(name)) + + super()._run_rtapp(te, res_dir, profile) + + def __init__(self, res_dir, plat_info, rtapp_profile): + super().__init__(res_dir, plat_info, rtapp_profile) + + self.cpus = set() + + self.reference_task = list(self.rtapp_profile.values())[0] + self.nr_phases = len(self.reference_task.phases) + + for task in self.rtapp_profile.values(): + for phase in task.phases: + self.cpus.update(phase.cpus) + + self.phases_durations = [phase.duration_s + for phase in self.reference_task.phases] + + @classmethod + def check_from_testenv(cls, te): + super().check_from_testenv(te) + + try: + te.plat_info["cpu-capacities"] + except KeyError as e: + raise CannotCreateError(str(e)) + + def get_expected_cpu_util(self): + """ + Get the per-phase average CPU utilization expected from the rtapp profile + + :returns: A dict of the shape {cpu : {phase_id : expected_util}} + """ + cpu_util = {cpu : {phase_id : 0 for phase_id in range(self.nr_phases)} + for cpu in self.cpus} + + for task in self.rtapp_profile.values(): + for phase_id, phase in enumerate(task.phases): + cpu_util[phase.cpus[0]][phase_id] += UTIL_SCALE * (phase.duty_cycle_pct / 100) + + return cpu_util + + def get_trace_cpu_util(self): + """ + Get the per-phase average CPU utilization read from the trace + + :returns: A dict of the shape {cpu : {phase_id : trace_util}} + """ + cpu_util = {cpu : {phase_id : 0 for phase_id in range(self.nr_phases)} + for cpu in self.cpus} + df = self.trace.analysis.load_tracking.df_cpus_signals() + sw_df = self.trace.df_events("sched_switch") + + phase_start = sw_df[sw_df.next_comm == list(self.rtapp_profile.keys())[0]].index[0] + + for phase in range(self.nr_phases): + # Start looking at signals once they should've converged + start = phase_start + UTIL_AVG_CONVERGENCE_TIME_S + # Trim the end a bit, otherwise we could have one or two events + # from the next phase + end = phase_start + self.phases_durations[phase] * .9 + + phase_df = df[start:end] + phase_duration = end - start + + for cpu in self.cpus: + util = phase_df[phase_df.cpu == cpu].util + cpu_util[cpu][phase] = area_under_curve(util) / (phase_duration) + + phase_start += self.phases_durations[phase] + + return cpu_util + + def test_util_task_migration(self, allowed_error_pct=5) -> ResultBundle: + """ + Test that a migrated task properly propagates its utilization at the CPU level + + :param allowed_error_pct: How much the trace averages can stray from the + expected values + :type allowed_error_pct: float + """ + expected_cpu_util = self.get_expected_cpu_util() + trace_cpu_util = self.get_trace_cpu_util() + + passed = True + + expected_metrics = {} + trace_metrics = {} + deltas = {} + + for cpu in self.cpus: + cpu_str = "cpu{}".format(cpu) + + expected_metrics[cpu_str] = TestMetric({}) + trace_metrics[cpu_str] = TestMetric({}) + deltas[cpu_str] = TestMetric({}) + + for phase in range(self.nr_phases): + if not self.is_almost_equal( + trace_cpu_util[cpu][phase], + expected_cpu_util[cpu][phase], + allowed_error_pct): + passed = False + + # Just some verbose metric collection... + phase_str = "phase{}".format(phase) + + expected = expected_cpu_util[cpu][phase] + trace = trace_cpu_util[cpu][phase] + delta = 100 * (trace - expected) / expected + + expected_metrics[cpu_str].data[phase_str] = TestMetric(expected) + trace_metrics[cpu_str].data[phase_str] = TestMetric(trace) + deltas[cpu_str].data[phase_str] = TestMetric(delta, "%") + + res = ResultBundle.from_bool(passed) + res.add_metric("Expected utilization", expected_metrics) + res.add_metric("Trace utilization", trace_metrics) + res.add_metric("Utilization deltas", deltas) + + return res + +class OneTaskCPUMigration(CPUMigrationBase): + """ + Some tasks on two big CPUs, one of them migrates in its second phase. + """ + + NR_REQUIRED_CPUS = 2 + """ + The number of CPUs of same capacity involved in the test + """ + + @classmethod + def get_migration_cpus(cls, te): + """ + :returns: :attr:`NR_REQUIRED_CPUS` CPUs of same capacity. + """ + # Iterate over descending CPU capacity groups + for cpus in reversed(te.plat_info["capacity-classes"]): + if len(cpus) >= cls.NR_REQUIRED_CPUS: + return cpus[:cls.NR_REQUIRED_CPUS] + + return [] + + @classmethod + def check_from_testenv(cls, te): + super().check_from_testenv(te) + + cpus = cls.get_migration_cpus(te) + if not len(cpus) == cls.NR_REQUIRED_CPUS: + raise CannotCreateError( + "This workload requires {} CPUs of identical capacity".format( + cls.NR_REQUIRED_CPUS)) + + @classmethod + def get_rtapp_profile(cls, te): + profile = {} + cpus = cls.get_migration_cpus(te) + + for task in ["migrating", "static0", "static1"]: + # An empty RTATask just to sum phases up + profile[task] = RTATask() + + for i in range(2): + # A task that will migrate to another CPU + profile["migrating"] += Periodic( + duty_cycle_pct=cls.unscaled_utilization(te, cpus[i], 20), + duration_s=cls.PHASE_DURATION_S, period_ms=cls.TASK_PERIOD_MS, + cpus=[cpus[i]]) + + # Just some tasks that won't move to get some background utilization + profile["static0"] += Periodic( + duty_cycle_pct=cls.unscaled_utilization(te, cpus[0], 30), + duration_s=cls.PHASE_DURATION_S, period_ms=cls.TASK_PERIOD_MS, + cpus=[cpus[0]]) + + profile["static1"] += Periodic( + duty_cycle_pct=cls.unscaled_utilization(te, cpus[1], 20), + duration_s=cls.PHASE_DURATION_S, period_ms=cls.TASK_PERIOD_MS, + cpus=[cpus[1]]) + + return profile + +class TwoTasksCPUMigration(OneTaskCPUMigration): + """ + Two tasks on two big CPUs, swap their CPU in the second phase + """ + + @classmethod + def get_rtapp_profile(cls, te): + profile = {} + cpus = cls.get_migration_cpus(te) + + for task in ["migrating0", "migrating1"]: + # An empty RTATask just to sum phases up + profile[task] = RTATask() + + for i in range(2): + # A task that will migrate from CPU A to CPU B + profile["migrating0"] += Periodic( + duty_cycle_pct=20, duration_s=cls.PHASE_DURATION_S, + period_ms=cls.TASK_PERIOD_MS, cpus=[cpus[i]]) + + # A task that will migrate from CPU B to CPU A + profile["migrating1"] += Periodic( + duty_cycle_pct=20, duration_s=cls.PHASE_DURATION_S, + period_ms=cls.TASK_PERIOD_MS, cpus=[cpus[1 - i]]) + + return profile diff --git a/lisa/tests/kernel/test_bundle.py b/lisa/tests/kernel/test_bundle.py index a1de00d6276d57aff16362d61f6478abcffbcc78..b8f5e5d693a1168f03cd8fe60eac54493ecf6d5f 100644 --- a/lisa/tests/kernel/test_bundle.py +++ b/lisa/tests/kernel/test_bundle.py @@ -382,6 +382,24 @@ class RTATestBundle(TestBundle, abc.ABC): super().__init__(res_dir, plat_info) self.rtapp_profile = rtapp_profile + @classmethod + def unscaled_utilization(cls, te, cpu, utilization_pct): + """ + Convert utilization scaled to a CPU to a 'raw', unscaled one. + + :param capacity: The CPU against which ``utilization_pct``` is scaled + :type capacity: int + + :param utilization_pct: The scaled utilization in % + :type utilization_pct: int + """ + if "nrg-model" in te.plat_info: + capacity_scale = te.plat_info["nrg-model"].capacity_scale + else: + capacity_scale = 1024 + + return int((te.plat_info["cpu-capacities"][cpu] / capacity_scale) * utilization_pct) + @classmethod @abc.abstractmethod def get_rtapp_profile(cls, te): diff --git a/lisa/utils.py b/lisa/utils.py index b9f061e0b77d2f12b4c07035b573636362ee8c5e..d097e02b2f241e41773010628a805a73eb6ce813 100644 --- a/lisa/utils.py +++ b/lisa/utils.py @@ -482,6 +482,37 @@ def groupby(iterable, key=None): iterable = sorted(iterable, key=key) return itertools.groupby(iterable, key=key) +def group_by_value(mapping, key_sort=lambda x: x): + """ + Group a mapping by its values + + :param mapping: Mapping to reverse + :type mapping: collections.abc.Mapping + + :param key_sort: The ``key`` parameter to a :func:`sorted` call on the + mapping keys + :type key_sort: collections.abc.Callable + + :rtype: collections.OrderedDict + + The idea behind this method is to "reverse" a mapping, IOW to create a new + mapping that has the passed mapping's values as keys. Since different keys + can point to the same value, the new values will be lists of old keys. + + **Example:** + + >>> group_by_value({0: 42, 1: 43, 2: 42}) + OrderedDict([(42, [0, 2]), (43, [1])]) + """ + if not key_sort: + # Just conserve the order + key_sort = lambda x: 0 + + return OrderedDict( + (val, sorted((k for k, v in key_group), key=key_sort)) + for val, key_group in groupby(mapping.items(), key=operator.itemgetter(1)) + ) + def deduplicate(seq, keep_last=True, key=lambda x: x): """ Deduplicate items in the given sequence and return a list.