diff --git a/lisa/pelt.py b/lisa/pelt.py index 6420c8ef7674e6098e0b0df94a892a15ebe08000..281c25becf01b79051ed658b3706fa5cdb6f209e 100644 --- a/lisa/pelt.py +++ b/lisa/pelt.py @@ -35,7 +35,7 @@ PELT half-life in number of windows. PELT_SCALE = 1024 -def simulate_pelt(activations, init=0, index=None, clock=None, capacity=None, windowless=False, window=PELT_WINDOW, half_life=PELT_HALF_LIFE, scale=PELT_SCALE): +def simulate_pelt(activations=None, init=0, index=None, clock=None, capacity=None, windowless=False, window=PELT_WINDOW, half_life=PELT_HALF_LIFE, scale=PELT_SCALE, return_sim=False): """ Simulate a PELT signal out of a series of activations. @@ -75,6 +75,15 @@ def simulate_pelt(activations, init=0, index=None, clock=None, capacity=None, wi :param scale: Scale of the signal, i.e. maximum value it can take. :type scale: float + :param return_sim: If ``True``, return a stateful function that can be + called on successive samples and return PELT values. The function + expects to be passed a dictionary with the following keys: + + * ``clock``: Value of the PELT clock + * ``activations``: ``1`` if the entity was active since the last + sample, ``0`` otherwise. + :type return_sim: bool + .. note:: PELT windowing is not time-invariant, i.e. it depends on the absolute value of the timestamp. This means that the timestamp of the activations matters, and it is recommended to use the ``clock`` @@ -84,6 +93,125 @@ def simulate_pelt(activations, init=0, index=None, clock=None, capacity=None, wi of computing the signal. This means that the simulation cannot perfectly match the kernel's signal. """ + + def make_windowed_pelt_sim(init, scale, window, half_life): + decay = (1 / 2)**(1 / half_life) + # Alpha as defined in https://en.wikipedia.org/wiki/Moving_average + alpha = 1 - decay + + # Accumulator of running time within a PELT window + acc = 0 + # Output signal + signal = init / scale + output = signal + prev_clock = math.nan + + def pelt(row): + nonlocal acc, signal, output, prev_clock + + # 1=running 0=sleeping + running = row['activations'] + + clock = row['clock'] + try: + delta = row['delta'] + except KeyError: + delta = clock - prev_clock + prev_clock = clock + + try: + windows = row['crossed_windows'] + except KeyError: + # This is an approximation, as the real number of crossed + # windows depends on the absolute value of the clock, not just + # the delta. + windows = delta // window + else: + windows = int(windows) + + # We crossed one or more windows boundaries + if windows: + # Handle last piece of the window in which this activation started + first_window_fraction = window - ((clock - delta) % window) + first_window_fraction /= window + + acc += running * first_window_fraction + signal = alpha * acc + (1 - alpha) * signal + + # Handle the windows we fully crossed + for _ in range(windows - 1): + signal = alpha * running + (1 - alpha) * signal + + # Handle the current incomplete window + last_window_fraction = (clock % window) / window + + # Extrapolate the signal as it would look with the same + # `running` state at the end of the current window + extrapolated = running * alpha + (1 - alpha) * signal + # Take an value between signal and extrapolated based on the + # current completion of the window. This implements the same + # idea as introduced by kernel commit: + # sched/cfs: Make util/load_avg more stable 625ed2bf049d5a352c1bcca962d6e133454eaaff + output = signal + last_window_fraction * (extrapolated - signal) + + signal += alpha * running * last_window_fraction + acc = 0 + # If we are still in the same window, just accumulate the running + # time + else: + acc += running * delta / window + + return output * scale + + return pelt + + def make_windowless_pelt_sim(init, scale, window, half_life): + tau = _pelt_tau(half_life, window) + signal = init + prev_clock = math.nan + + def pelt_after(init, t, running): + # Compute the the response of the 1st order filter at time "t", + # with the given initial condition + # http://fourier.eng.hmc.edu/e59/lectures/e59/node33.html + exp_ = math.exp(-t / tau) + non_zero = running * scale * (1 - exp_) + zero = init * exp_ + return non_zero + zero + + def pelt(row): + nonlocal signal, prev_clock + # 1=running 0=sleeping + running = row['activations'] + try: + delta = row['delta'] + except KeyError: + clock = row['clock'] + delta = clock - prev_clock + prev_clock = clock + + signal = pelt_after(signal, delta, running) + return signal + + return pelt + + if windowless: + make_sim = make_windowless_pelt_sim + else: + make_sim = make_windowed_pelt_sim + + sim = make_sim( + init=init, + window=window, + half_life=half_life, + scale=scale, + ) + + if return_sim: + return sim + elif activations is None: + raise ValueError('activations must be provided if return_sim=None') + if index is None: index = activations.index else: @@ -179,98 +307,6 @@ def simulate_pelt(activations, init=0, index=None, clock=None, capacity=None, wi # First row of "delta" is NaN, and activations reindex may have produced # some NaN at the beginning of the dataframe as well df.dropna(inplace=True) - - def make_windowed_pelt_sim(init, scale, window, half_life): - decay = (1 / 2)**(1 / half_life) - # Alpha as defined in https://en.wikipedia.org/wiki/Moving_average - alpha = 1 - decay - - # Accumulator of running time within a PELT window - acc = 0 - # Output signal - signal = init / scale - output = signal - - def pelt(row): - nonlocal acc, signal, output - - # 1=running 0=sleeping - running = row['activations'] - clock = row['clock'] - delta = row['delta'] - windows = row['crossed_windows'].astype('int') - - # We crossed one or more windows boundaries - if windows: - # Handle last piece of the window in which this activation started - first_window_fraction = window - ((clock - delta) % window) - first_window_fraction /= window - - acc += running * first_window_fraction - signal = alpha * acc + (1 - alpha) * signal - - # Handle the windows we fully crossed - for _ in range(windows - 1): - signal = alpha * running + (1 - alpha) * signal - - # Handle the current incomplete window - last_window_fraction = (clock % window) / window - - # Extrapolate the signal as it would look with the same - # `running` state at the end of the current window - extrapolated = running * alpha + (1 - alpha) * signal - # Take an value between signal and extrapolated based on the - # current completion of the window. This implements the same - # idea as introduced by kernel commit: - # sched/cfs: Make util/load_avg more stable 625ed2bf049d5a352c1bcca962d6e133454eaaff - output = signal + last_window_fraction * (extrapolated - signal) - - signal += alpha * running * last_window_fraction - acc = 0 - # If we are still in the same window, just accumulate the running - # time - else: - acc += running * delta / window - - return output * scale - - return pelt - - def make_windowless_pelt_sim(init, scale, window, half_life): - tau = _pelt_tau(half_life, window) - signal = init - - def pelt_after(init, t, running): - # Compute the the response of the 1st order filter at time "t", - # with the given initial condition - # http://fourier.eng.hmc.edu/e59/lectures/e59/node33.html - exp_ = math.exp(-t / tau) - non_zero = running * scale * (1 - exp_) - zero = init * exp_ - return non_zero + zero - - def pelt(row): - nonlocal signal - # 1=running 0=sleeping - running = row['activations'] - delta = row['delta'] - - signal = pelt_after(signal, delta, running) - return signal - - return pelt - - if windowless: - make_sim = make_windowless_pelt_sim - else: - make_sim = make_windowed_pelt_sim - - sim = make_sim( - init=init, - window=window, - half_life=half_life, - scale=scale, - ) df['pelt'] = df.apply(sim, axis=1) pelt = df['pelt'] if pelt.index is not index: