diff --git a/benchmarks/selftest/echo.yaml b/benchmarks/selftest/echo.yaml new file mode 100644 index 0000000000000000000000000000000000000000..381ee0ed34c6c2cd8cd273e53efdebe248a64156 --- /dev/null +++ b/benchmarks/selftest/echo.yaml @@ -0,0 +1,7 @@ +suite: selftest +name: echo +type: selftest +image: registry.gitlab.arm.com/tooling/fastpath/containers/selftest:v1.0 +roles: + - client + - server diff --git a/containers/selftest/Dockerfile b/containers/selftest/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..22b0847401deaccfb126dfbde64a6c17f0fe1b22 --- /dev/null +++ b/containers/selftest/Dockerfile @@ -0,0 +1,30 @@ +# Copyright (c) 2025, Arm Limited. +# SPDX-License-Identifier: MIT + +FROM registry.gitlab.arm.com/tooling/fastpath/containers/base:latest + +# Ensure apt won't ask for user input. +ENV DEBIAN_FRONTEND=noninteractive + +# Explicitly install Python and create a venv for pip packages, since Debian +# does not allow us to install pip packages system-wide. These packages are +# needed for all benchmark containers. +RUN apt-get update && \ + apt-get install --assume-yes --no-install-recommends --option=debug::pkgProblemResolver=yes \ + python3 \ + python3-pip \ + python3-venv \ + python3-dev \ + build-essential \ + pkg-config +RUN python3 -m venv /pyvenv +ENV PATH="/pyvenv/bin:${PATH}" +COPY fastpath/requirements.txt /tmp/requirements.txt +RUN pip3 install -r /tmp/requirements.txt +RUN rm -rf /tmp/requirements.txt + +# Setup the entrypoint. +RUN mkdir /fastpath +ARG NAME +COPY containers/${NAME}/exec.py /fastpath/. +CMD /fastpath/exec.py diff --git a/containers/selftest/exec.py b/containers/selftest/exec.py new file mode 100755 index 0000000000000000000000000000000000000000..c8de77d4f788019f935bcd2867c8474e971f9e60 --- /dev/null +++ b/containers/selftest/exec.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python3 +# Copyright (c) 2024, Arm Limited. +# SPDX-License-Identifier: MIT + + +import argparse +import csv +import os +import socket +import time +import yaml + + +ERR_NONE = 0 +ERR_INVAL_BENCHMARK_FORMAT = 2 +PORT = 12345 +MESSAGE = "Hello, server!" + + +def execute_server(client_ip): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("0.0.0.0", PORT)) + s.listen() + print(f"Server listening on port {PORT}") + + while True: + conn, addr = s.accept() + remote_ip, remote_port = addr + print(f"Connection attempt from {remote_ip}:{remote_port}") + + if remote_ip != client_ip: + print("Unauthorized client. Connection closed.") + conn.close() + else: + break + + with conn: + while True: + data = conn.recv(1024) + if not data: + break + conn.sendall(data) + + +def execute_client(server_ip): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + while True: + try: + s.connect((server_ip, PORT)) + break + except ConnectionRefusedError: + # Wait for server to start listening. + time.sleep(1) + + start = time.perf_counter() + s.sendall(MESSAGE.encode()) + data = s.recv(1024) + end = time.perf_counter() + + round_trip_us = (end - start) * 1000000 + response = data.decode() + + print(f"Sent: {MESSAGE}") + print(f"Received: {response}") + print(f"Latency: {round_trip_us:.2f} us") + + return round_trip_us + + +def validate_benchmark(benchmark): + if benchmark["suite"] != "selftest": + return ERR_INVAL_BENCHMARK_FORMAT + if benchmark["name"] != "echo": + return ERR_INVAL_BENCHMARK_FORMAT + if benchmark["role"] not in ["client", "server"]: + return ERR_INVAL_BENCHMARK_FORMAT + return ERR_NONE + + +def main(): + output_results = True + results = { + "name": "round trip time", + "unit": "us", + "improvement": "smaller", + "value": 0.0, + } + + parser = argparse.ArgumentParser() + parser.add_argument( + "--fastpath-share", required=False, default="/fastpath-share" + ) + args = parser.parse_args() + + with open(os.path.join(args.fastpath_share, "benchmark.yaml")) as f: + benchmark = yaml.safe_load(f) + + results["error"] = validate_benchmark(benchmark) + if results["error"] == 0: + if benchmark["role"] == "client": + results["value"] = execute_client(benchmark["ipmap"]["server"]) + elif benchmark["role"] == "server": + execute_server(benchmark["ipmap"]["client"]) + output_results = False + + if output_results: + with open(os.path.join(args.fastpath_share, "results.csv"), "w") as f: + writer = csv.DictWriter(f, fieldnames=results.keys()) + writer.writeheader() + writer.writerow(results) + + +if __name__ == "__main__": + main() diff --git a/documentation/images/resultstoreschema.png b/documentation/images/resultstoreschema.png index cf110a88586447d6fe9422664260749fa7230003..3d0303c3aaf7f01c7108cf1a623a1a7607097201 100644 Binary files a/documentation/images/resultstoreschema.png and b/documentation/images/resultstoreschema.png differ diff --git a/documentation/user-guide/planschema.rst b/documentation/user-guide/planschema.rst index e288651be79d37e0a0cd37cba7983d6566f4b40d..a6df13b2673bbfd6f9e9823c1416eadd32c9ab9b 100644 --- a/documentation/user-guide/planschema.rst +++ b/documentation/user-guide/planschema.rst @@ -34,28 +34,43 @@ The sut dictionary describes The system under test (SUT): key type required default description ========== ====== ======== =========================== =========== name string false None User-supplied friendly name to identify the system under test. -connection dict true N/A Describes how to connect to SUT. See "connection object". +connection dict 1 from 2 N/A For a single-node SUT, describes how to connect to the node. See "node.connection object". Exactly one of "connection" and "nodes" is required. +nodes list 1 from 2 N/A A list of nodes objects, each of which describes a node (machine/device) from which the SUT is composed. Exactly one of "connection" and "nodes" is required. ========== ====== ======== =========================== =========== -********************* -sut.connection object -********************* +*********** +node object +*********** + +A node is a machine/device that runs a single instance of Linux. A SUT is +composed of one or mode nodes. It contains the following keys: + +========== ====== ======== =========================== =========== +key type required default description +========== ====== ======== =========================== =========== +name string false None User-supplied friendly name to identify the node. Must be unique amongst nodes. +connection dict true N/A Describes how to connect to the node. See "node.connection object". +========== ====== ======== =========================== =========== + +********************** +node.connection object +********************** -The connection dictionary describes how to connect to the SUT that the +The connection dictionary describes how to connect to the node that the benchmarks will run on. It contains the following keys: ========== ====== ======== =========================== =========== key type required default description ========== ====== ======== =========================== =========== -method enum true N/A Method used to connect to the SUT. For now, only "SSH" is supported. In future, "LAVA" will be added. +method enum true N/A Method used to connect to the node. For now, only "SSH" is supported. In future, "LAVA" will be added. params dict true N/A Method-specific dictionary of parameters. See "SSH-params" below. ========== ====== ======== =========================== =========== -******************************** -sut.connection.SSH-params object -******************************** +********************************* +node.connection.SSH-params object +********************************* -The SSH-params dictionary describes how to connect to a SUT using the "SSH" +The SSH-params dictionary describes how to connect to a node using the "SSH" method. It contains the following keys: ========== ====== ======== =========================== =========== @@ -109,6 +124,8 @@ repeats int false defaults.benchmark.repeats Number of times to repeat sessions int false defaults.benchmark.sessions Number of times to reboot the SUT to repeat the benchmark. warmups int false defaults.benchmark.warmups Number of times to run the benchmark at the start of a boot session to warm up the system before the real repeats are executed. timeout string false defaults.benchmark.timeout Timeout after which to assume the benchmark has hung. Provided as a string with format "" where the suffix is 's' (seconds), 'm' (minutes), 'h' (hours) or 'd' days. +roles list false [executer] List of roles that the benchmark implements. When multiple roles are defined, each role is executed in parallel, possibly on different nodes. If not specified, the benchmark is assumed to have a single role. +rolemap dict false {r: 0 for r in roles} Dictionary mapping roles to nodes, where key is the role and value represents the node, either an integer index into sut.nodes or a node name if a string. Multiple roles may be mapped to the same node. Any unmapped roles default to sut.nodes[0]. ========== ====== ======== =========================== =========== *************** diff --git a/documentation/user-guide/resultstoreschema.rst b/documentation/user-guide/resultstoreschema.rst index e62f93f0ee8a0f456cde4af6557a68c420367b27..d1c2da96a9b87a7f28aa234cde6acc87f14ce395 100644 --- a/documentation/user-guide/resultstoreschema.rst +++ b/documentation/user-guide/resultstoreschema.rst @@ -29,14 +29,15 @@ name text Benchmark name as it appears in the plan type text Benchmark type as it appears in the plan image text Benchmark container image identifier as it appears in the plan params_hash char(64) SHA256 hash of the params dictionary as they appear in the plan +roles_hash char(64) SHA256 hash of the roles dictionary as they appear in the plan ======================== ============== =========== ********* CPU Table ********* -One entity per CPU contained within a SUT. Describes the set of CPUs that a SUT -contains. +One entity per CPU contained within a node. Describes the set of CPUs that a +node contains. ======================== ============== =========== key type description @@ -44,7 +45,7 @@ key type description id bigserial (PK) Primary key. Auto-incrementing integer desc text arch-specific descriptor, encoded in JSON cpu_index smallint Logical CPU index -sut_id bigint (FK) SUT ID relates CPUs to SUT +node_id bigint (FK) Node ID relates CPUs to node ======================== ============== =========== *********** @@ -64,10 +65,38 @@ timestamp timestamp Approximate time at which error was emit sut_id bigint (FK) SUT ID relates error to SUT swprofile_id bigint (FK) swprofile ID relates error to swprofile benchmark_id bigint (FK) Benchmark ID relates error to Benchmark +rolemap_id bigint (FK) Rolemap ID relates error to rolemap +role_id bigint (FK) Role ID relates error to role that generated the error session_uuid uuid Universally Unique ID representing the boot session on the SUT that the result was generated during. Conceptually, a new UUID is generated each time the SUT is rebooted. Used to group results and errors from the same session error int Error code ======================== ============== =========== +********** +NODE Table +********** + +One entity per node. Describes a unique node within a SUT. A node is a machine +or device that runs a single instance of Linux. A SUT may be composed of 1 or +more nodes. + +======================== ============== =========== +key type description +======================== ============== =========== +id bigserial (PK) Primary key. Auto-incrementing integer +sut_id bigint (FK) SUT that the node belongs to +name text Node friendly name as it appears in the plan +host_name text result of `uname -n` +architecture text result of `uname -m` +cpu_count smallint Number of CPUs in the system +cpu_info_hash char(64) SHA256 hash of all the CPU info structures +numa_count smallint Number of NUMA nodes in the system +ram_sz bigint Total RAM in the system in bytes +hypervisor text The name of the hypervisor if running in a virtual machine. (e.g. 'kvm', 'hyper-v' etc.) else empty +product_name text The DMI product_name, if available, else empty +product_serial text The DMI product_serial, if available, else empty +mac_addrs_hash char(64) SHA256 hash of the sorted list of permanent MAC addresses from physical devices attached to the node +======================== ============== =========== + *********** PARAM Table *********** @@ -98,6 +127,8 @@ timestamp timestamp Approximate time at which result was col resultclass_id bigint (FK) ResultClass ID relates Result to ResultClass sut_id bigint (FK) SUT ID relates Result to SUT swprofile_id bigint (FK) swprofile ID relates Result to swprofile +rolemap_id bigint (FK) Rolemap ID relates result to rolemap +role_id bigint (FK) Role ID relates result to role that generated the result session_uuid uuid Universally Unique ID representing the boot session on the SUT that the result was generated during. Conceptually, a new UUID is generated each time the SUT is rebooted. Used to group results and errors from the same session value double Numerical result value ======================== ============== =========== @@ -118,27 +149,62 @@ unit text Unit of the result value (e.g. seconds, improvement enum Either 'bigger' or 'smaller': Determines whether a bigger or smaller value represents an improvement in performance ======================== ============== =========== +********** +ROLE Table +********** + +Describes a role that a benchmark performs. + +======================== ============== =========== +key type description +======================== ============== =========== +id bigserial (PK) Primary key. Auto-incrementing integer +name text Role name as it appears in the plan +benchmark_id bigint (FK) Benchmark ID relates role to benchmark +======================== ============== =========== + +************* +ROLEMAP Table +************* + +Describes the mapping of a benchmark's roles to the nodes they execute on. +Composed of 1 or more rmdescs. + +======================== ============== =========== +key type description +======================== ============== =========== +id bigserial (PK) Primary key. Auto-incrementing integer +rmdescs_hash char(64) SHA256 hash of all the rmdesc structures +======================== ============== =========== + +************ +RMDESC Table +************ + +Describes the mapping of a single benchmark role to the node it executes on. + +======================== ============== =========== +key type description +======================== ============== =========== +id bigserial (PK) Primary key. Auto-incrementing integer +rolemap_id bigint (FK) Rolemap that the rmdesc belongs to +role_id bigint (FK) Role that is being mapped +node_id bigint (FK) Node that the role is mapped to +======================== ============== =========== + ********* SUT Table ********* -One entity per SUT. Describes a unique SUT independent of its SW profile. +One entity per SUT. Describes a unique SUT independent of its SW profile. A SUT +is composed of 1 of more nodes. ======================== ============== =========== key type description ======================== ============== =========== id bigserial (PK) Primary key. Auto-incrementing integer name text SUT friendly name as it appears in the plan -host_name text result of `uname -n` -architecture text result of `uname -m` -cpu_count smallint Number of CPUs in the system -cpu_info_hash char(64) SHA256 hash of all the CPU info structures -numa_count smallint Number of NUMA nodes in the system -ram_sz bigint Total RAM in the system in bytes -hypervisor text The name of the hypervisor if running in a virtual machine. (e.g. 'kvm', 'hyper-v' etc.) else empty -product_name text The DMI product_name, if available, else empty -product_serial text The DMI product_serial, if available, else empty -mac_addrs_hash char(64) SHA256 hash of the sorted list of permanent MAC addresses from physical devices attached to the SUT +nodes_hash char(64) SHA256 hash of all the node structures ======================== ============== =========== *************** diff --git a/fastpath/commands/nouns/sut.py b/fastpath/commands/nouns/sut.py index 673eace216304767a6a093716d874df16e2061e4..5189b4e922b28ca33cbe1a8040ed84e28992bbba 100644 --- a/fastpath/commands/nouns/sut.py +++ b/fastpath/commands/nouns/sut.py @@ -12,6 +12,9 @@ from fastpath.commands.verbs.sut import reboot from fastpath.commands.verbs.sut import uninstall +from fastpath.utils import logger + + noun_name = os.path.splitext(os.path.basename(__file__))[0] noun_help = """system under test where benchmarks execute""" noun_desc = """Operate on a system under test (SUT) where benchmarks execute.""" @@ -116,18 +119,9 @@ def dispatch(args): "keyfile": args.keyfile, } + args.log = logger.Logger() if args.verbose: - args.log = sys.stdout - else: - - class DevNull: - def flush(self): - pass - - def write(self, str): - pass - - args.log = DevNull() + args.log.set_logfile(sys.stdout) # Dispatch to the correct verb handler. verbs[args.verb].dispatch(args) diff --git a/fastpath/commands/verbs/bisect/run.py b/fastpath/commands/verbs/bisect/run.py index ac062ef98706618aa918bdd08accf814f970b21e..25467e52cd94792347ebd7c5a7780f9fe22f56bc 100644 --- a/fastpath/commands/verbs/bisect/run.py +++ b/fastpath/commands/verbs/bisect/run.py @@ -13,15 +13,9 @@ import yaml from fastpath.commands import cliutils from fastpath.commands.verbs.plan import exec as plan_exec -from fastpath.commands.verbs.result import merge from fastpath.commands.verbs.result import show from fastpath.utils import plan as plan_utils -from fastpath.utils.table import ( - Table, - load_tables, - join_results, - filter_results, -) +from fastpath.utils.table import load_tables, join_results, filter_results verb_name = os.path.splitext(os.path.basename(__file__))[0] diff --git a/fastpath/commands/verbs/bisect/start.py b/fastpath/commands/verbs/bisect/start.py index 6a1b06f2e17573bf2c3567ca22b3633ea29c61cd..cd9c853034ad8c0ae5acec80c939916cd8791d9c 100644 --- a/fastpath/commands/verbs/bisect/start.py +++ b/fastpath/commands/verbs/bisect/start.py @@ -8,7 +8,6 @@ import tempfile import yaml from fastpath.commands import cliutils -from fastpath.utils import resultstore as rs from fastpath.utils import workspace from fastpath.utils.table import ( Table, diff --git a/fastpath/commands/verbs/plan/exec.py b/fastpath/commands/verbs/plan/exec.py index 6d6496834be3f7a2f952f172cf273fbc19780daf..94af953209a0a6e159d0c187d16a4b7350849d8a 100644 --- a/fastpath/commands/verbs/plan/exec.py +++ b/fastpath/commands/verbs/plan/exec.py @@ -4,6 +4,7 @@ import datetime import cerberus +from concurrent.futures import ThreadPoolExecutor import invoke import io import json @@ -19,11 +20,12 @@ from fastpath.commands import cliutils from fastpath.utils import configure from fastpath.utils import fingerprint from fastpath.utils import kmsg +from fastpath.utils import logger from fastpath.utils import machine from fastpath.utils import plan from fastpath.utils import resultstore as rs +from fastpath.utils import schema from fastpath.utils.bmutils import BenchmarkError, BenchmarkException -from fastpath.utils.table import Table verb_name = os.path.splitext(os.path.basename(__file__))[0] @@ -55,13 +57,25 @@ def add_parser(parser, formatter, add_noun_args): cliutils.add_generic_args(verbp) add_noun_args(verbp) + verbp.add_argument( + "--resultstore", + metavar="", + required=False, + help="""Store where merged results are saved. Fastpath will exit with + error if store already exists, unless --append is specified. URL + encoded to describe a resultstore in either csv, sqlite or mysql + format. csv: csv:/// (although "csv:///" is optional). + sqlite: sqlite:///. mysql: + mysql://:@:/.""", + ) + verbp.add_argument( "--output", - metavar="", + metavar="", required=True, - help="""Location where results and logs will be stored. Fastpath will - exit with error if directory already exists, unless --append is - specified. Only csv stores are supported.""", + help="""Location where logs will be stored. Fastpath will exit with + error if directory already exists, unless --append is specified. If + --resultstore is not provided, csv resultstore is created here.""", ) verbp.add_argument( @@ -76,52 +90,64 @@ def add_parser(parser, formatter, add_noun_args): return verb_name +rstore = None +log = logger.Logger() +pbar = None + + def dispatch(args): """ Part of the command interface expected by fastpath.py. Called to execute the subcommand, with the arguments the user passed on the command line. The arguments comply with those requested in add_parser(). """ + global rstore, pbar + normplan = plan.load(args.plan) - sut = normplan["sut"] - connection = normplan["sut"]["connection"] - swprofiles = normplan["swprofiles"] - benchmarks = normplan["benchmarks"] + sut = Sut(normplan["sut"]) + swprofiles = [SwProfile(swprofile) for swprofile in normplan["swprofiles"]] + benchmarks = [Benchmark(benchmark) for benchmark in normplan["benchmarks"]] + + # ROLEMAP is really a top-level object, but since it is described + # per-benchmark, it is convenient to attach it to the benchmark object. This + # attachment is not reflected in the resultstore schema, and is only for the + # benefit of code in this file. + for bm in benchmarks: + bm.rolemap = RoleMap(bm.planobj["rolemap"], bm, sut) - if not rs.is_csv(args.output): - raise Exception("output resultstore must be csv") + url = rs.normalize(args.resultstore if args.resultstore else args.output) + rstore = rs.create_open_or_import(url, args.append) basedir = rs.normalize(args.output) os.makedirs(basedir, exist_ok=args.append) logsdir = logs_dir(basedir, "logs") - csvs = {} try: - # Create a csv file for each table. - for table in Table: - csv = new_csvfile(table, basedir) - csvs[table] = csv - # Execute! print(f"Executing {os.path.basename(args.plan)}...", file=sys.stderr) - with open(os.path.join(basedir, "fastpath.log"), "a") as log: - with machine.open( - log, connection["method"], connection["params"] - ) as ctx: - with ProgressBar(normplan) as pbar: - do_one_sut( - pbar, - ctx, - logsdir, - csvs, - {}, - sut, - swprofiles, - benchmarks, - ) + with open(os.path.join(basedir, "fastpath.log"), "a") as logfile: + log.set_logfile(logfile) + try: + for node in sut.nodes: + method = node.planobj["connection"]["method"] + params = node.planobj["connection"]["params"] + node.ctx = machine.open(log, method, params) + with ProgressBar(normplan) as pbar_local: + pbar = pbar_local + do_one_sut(logsdir, sut, swprofiles, benchmarks) + finally: + for node in sut.nodes: + if hasattr(node, "ctx"): + node.ctx.close() + del node.ctx + log.set_logfile(None) + pbar = None finally: - for csv in csvs.values(): - csv.close() + # Export to csv if that's what the requested output is. + if rs.is_csv(url): + rstore.to_csv(url) + rstore.close() + rstore = None class ProgressBar: @@ -155,213 +181,6 @@ class ProgressBar: self.close() -class CSVFile: - """ - Encapsulates a CSV file, allowing data to be output to the file row by row - so that all data can be guarranteed safe even if the program crashes. It - also conveniently allows a consumer to read the data while its still being - produced. If the file already exists, will append new rows starting with the - next available id. - """ - - def __init__(self, filename, columns): - """ - Create a CSVFile to wrap and append to the CSV file at filename. columns - is a list of the column names. - """ - self.csv = None - self.next_id = 1 - self.columns = columns - - # If the file exists, ensure its columns are compatible and figure out - # the id of the next row. - try: - df = pd.read_csv(filename) - except Exception: - pass - else: - if list(df.columns) != ["id"] + columns: - raise Exception(f"{filename} exists with conflicting columns.") - self.next_id = len(df) + 1 - - # If no rows have been output, start fresh. - append = self.next_id > 1 - self.csv = open(filename, mode="a" if append else "w") - - # Write the header if the file is fresh. - if not append: - df = pd.DataFrame(None, index=None, columns=self.columns) - df = df.reset_index(names="id") - line = df.to_csv(index=False, header=True) - self.csv.write(line) - self.csv.flush() - - def append(self, data): - """ - Append a row or list of rows to the csv file. Return id of appended row - if single row was provided or None if list of rows was provided. - """ - if isinstance(data, list): - for row in data: - self.append(row) - return - - if not all(col in data.keys() for col in self.columns): - raise Exception("Row data does not match csv columns.") - - id = self.next_id - - # Write the row. - df = pd.DataFrame(data, index=[id], columns=self.columns) - df = df.reset_index(names="id") - line = df.to_csv(index=False, header=False) - self.csv.write(line) - self.csv.flush() - - self.next_id += 1 - return id - - def close(self): - if self.csv: - self.csv.close() - - -class UniqueCSVFile(CSVFile): - """ - Similar to CSVFile, except it only permits unique entries. If an entry is - appended that already exists, the id of the existing entry is returned. - """ - - def __init__(self, filename, columns): - super().__init__(filename, columns) - self.idlut = {} - try: - df = pd.read_csv(filename) - df = df.replace({float("nan"): None, "": None}) - except Exception: - pass - else: - for _, row in df.iterrows(): - id = row["id"] - data = row[columns].to_dict() - self.idlut[fingerprint.hash(data)] = id - - def append(self, data): - if isinstance(data, list): - for row in data: - self.append(row) - return - - # Normalize and hash. - df = pd.DataFrame(data, index=[0], columns=self.columns) - df = df.replace({float("nan"): None, "": None}) - data = df.to_dict("records")[0] - - hash = fingerprint.hash(data) - if hash in self.idlut: - return self.idlut[hash] - id = super().append(data) - self.idlut[hash] = id - return id - - -tables = { - Table.BENCHMARK: { - "type": UniqueCSVFile, - "cols": [ - "suite", - "name", - "type", - "image", - "params_hash", - ], - }, - Table.SWPROFILE: { - "type": UniqueCSVFile, - "cols": [ - "name", - "kernel_name", - "kernel_git_sha", - "kernel_kconfig_full_hash", - "kernel_cmdline_full_hash", - "userspace_name", - "cmdline", - "sysctl", - "bootscript", - ], - }, - Table.CPU: { - "type": UniqueCSVFile, - "cols": [ - "desc", - "cpu_index", - "sut_id", - ], - }, - Table.ERROR: { - "type": CSVFile, - "cols": [ - "timestamp", - "sut_id", - "swprofile_id", - "benchmark_id", - "session_uuid", - "error", - ], - }, - Table.PARAM: { - "type": UniqueCSVFile, - "cols": [ - "name", - "value", - "benchmark_id", - ], - }, - Table.RESULT: { - "type": CSVFile, - "cols": [ - "timestamp", - "resultclass_id", - "sut_id", - "swprofile_id", - "session_uuid", - "value", - ], - }, - Table.RESULTCLASS: { - "type": UniqueCSVFile, - "cols": [ - "benchmark_id", - "name", - "unit", - "improvement", - ], - }, - Table.SUT: { - "type": UniqueCSVFile, - "cols": [ - "name", - "host_name", - "architecture", - "cpu_count", - "cpu_info_hash", - "numa_count", - "ram_sz", - "hypervisor", - "product_name", - "product_serial", - "mac_addrs_hash", - ], - }, -} - - -def new_csvfile(table, directory): - filename = os.path.join(directory, f"{table.name}.csv") - csv = tables[table]["type"](filename, tables[table]["cols"]) - return csv - - def slugify(value): value = unicodedata.normalize("NFKD", value).encode("ascii").decode("ascii") value = re.sub(r"[^a-zA-Z0-9]+", "-", value) @@ -492,20 +311,21 @@ def cleanup_containers(ctx): ctx.log(f"Failed to cleanup containers: {e}\n") -def start_container(ctx, image): +def start_container(ctx, image, name): # If "docker run" fails, the subsequent "docker exec" will also fail, # and do_one_repeat() will log the error. Some benchmarks # (microvm/vmalloc) need to install a kernel module (test_vmalloc) so # give the container access to the modules. ctx.run( f""" - mkdir -p /tmp/fastpath-share + mkdir -p /tmp/fastpath-share-{name} docker run \\ --detach \\ --privileged \\ - --volume /tmp/fastpath-share:/fastpath-share \\ + --network host \\ + --volume /tmp/fastpath-share-{name}:/fastpath-share \\ --volume /lib/modules:/lib/modules \\ - --name fastpath-benchmark \\ + --name fastpath-benchmark-{name} \\ {image} \\ sleep 48h """, @@ -513,288 +333,531 @@ def start_container(ctx, image): ) -def stop_container(ctx): +def stop_container(ctx, name): ctx.run( f""" - docker container stop fastpath-benchmark - docker container rm --force fastpath-benchmark + docker container stop fastpath-benchmark-{name} + docker container rm --force fastpath-benchmark-{name} """, warn=True, ) -def restart_container(ctx, image): - stop_container(ctx) - start_container(ctx, image) +def restart_container(ctx, image, name): + stop_container(ctx, name) + start_container(ctx, image, name) -def do_one_repeat( - pbar, ctx, basedir, csvs, ids, benchmark, session_uuid, repeat -): +def for_each_parallel(items, fn): + """ + Calls fn() for each item in items, all in parallel on background threads. + The return values from all fn() calls are returned as a list with indexes + corresponding to items. If any instance of fn() raises an exception, + for_each_parallel() raises that exception too. + """ + with ThreadPoolExecutor(max_workers=len(items)) as executor: + results = [] + futures = [] + for item in items: + if isinstance(item, dict): + future = executor.submit(fn, **item) + else: + future = executor.submit(fn, item) + futures.append(future) + for future in futures: + results.append(future.result()) + return results + + +def do_one_repeat(basedir, sut, swprofile, benchmark, suuid, repeat): warmup = basedir is None - ctx.log(f"BEGIN: {'warmup' if warmup else 'repeat'}: {repeat}\n") + log.log(f"BEGIN: {'warmup' if warmup else 'repeat'}: {repeat}\n") - # Prep the shared directory on the sut and populate the benchmark meta data - # that the container will consume. The directory may have files owned by - # root due to being generated by the container which runs with --privileged. - ctx.run( - """ - rm -rf /tmp/fastpath-share.tar.gz - sudo rm -rf /tmp/fastpath-share/* - mkdir -p /tmp/fastpath-share/output - """ - ) - bmdata = { - "suite": benchmark["suite"], - "name": benchmark["name"], - "params": benchmark["params"], + # Since we run the containers with host networking, the container shares + # it's IP address with the host so this works. + ipmap = { + role.name: benchmark.rolemap.map(role).ctx.ip_addr() + for role in benchmark.roles } - f = io.BytesIO(plan.dump(bmdata).encode()) - ctx.put(f, "/tmp/fastpath-share/benchmark.yaml") - # Invoke docker container on SUT and wait for timeout. - error = BenchmarkError.NONE - timeout = timeout_to_secs(benchmark["timeout"]) - try: - ctx.run( - f""" - docker exec \\ - --privileged \\ - fastpath-benchmark \\ - /fastpath/exec.py - """, - timeout=timeout, - ) - except invoke.exceptions.CommandTimedOut: - error = BenchmarkError.BENCHMARK_TIMEOUT - except invoke.exceptions.UnexpectedExit: - error = BenchmarkError.BENCHMARK_FAIL + def exec_role(role): + node = benchmark.rolemap.map(role) + name = role.name + + # The benchmark could have multiple roles that map to the same node. + # Therefore we can't use node.ctx directly since it's possible that 2 or + # more threads would be using it concurrently. So create our own + # subconnection. + with node.ctx.subconnection() as ctx: + # Prep the shared directory on the sut and populate the benchmark + # meta data that the container will consume. The directory may have + # files owned by root due to being generated by the container which + # runs with --privileged. + ctx.run( + f""" + rm -rf /tmp/fastpath-share-{name}.tar.gz + sudo rm -rf /tmp/fastpath-share-{name}/* + mkdir -p /tmp/fastpath-share-{name}/output + """ + ) + bmdata = { + "suite": benchmark.suite, + "name": benchmark.name, + "params": benchmark.planobj["params"], + "role": name, + "ipmap": ipmap, + } + f = io.BytesIO(plan.dump(bmdata).encode()) + ctx.put(f, f"/tmp/fastpath-share-{name}/benchmark.yaml") + + # Invoke docker container on SUT and wait for timeout. + error = BenchmarkError.NONE + timeout = timeout_to_secs(benchmark.planobj["timeout"]) + try: + ctx.run( + f""" + docker exec \\ + --privileged \\ + fastpath-benchmark-{name} \\ + /fastpath/exec.py + """, + timeout=timeout, + ) + except invoke.exceptions.CommandTimedOut: + error = BenchmarkError.BENCHMARK_TIMEOUT + except invoke.exceptions.UnexpectedExit: + error = BenchmarkError.BENCHMARK_FAIL + + return error + + # Execute all roles and reduce to a single error (first not NONE). + errors = for_each_parallel(benchmark.roles, exec_role) + errors = [e for e in errors if e != BenchmarkError.NONE] + error = BenchmarkError.NONE if len(errors) == 0 else errors[0] - pbar.progress(benchmark, session_uuid, repeat) + pbar.progress(benchmark, suuid, repeat) # If there was an error, restart the container to clear up any dirty state. # e.g. if the benchmark timed out, the threads will still be running. if error != BenchmarkError.NONE: - restart_container(ctx, benchmark["image"]) + for role in benchmark.roles: + node = benchmark.rolemap.map(role) + restart_container(node.ctx, benchmark.image, role.name) # If it's a warmup iteration, don't bother to parse the results. if warmup: - ctx.log(f"END: warmup: {repeat} ({error})\n") + log.log(f"END: warmup: {repeat} ({error})\n") return - # Compress /tmp/fastpath-share and copy back, then uncompress into repeat's - # log directory. - ctx.run( - """ - cd /tmp - tar -czf fastpath-share.tar.gz fastpath-share - """ - ) - ctx.get("/tmp/fastpath-share.tar.gz", f"{basedir}/fastpath-share.tar.gz") - subprocess.run( - f""" - cd {basedir} && - tar -xf fastpath-share.tar.gz && - rm -rf fastpath-share.tar.gz && - mv fastpath-share repeat-{repeat} - """, - shell=True, - check=True, - capture_output=True, - ) + dfs = [] + + for role in benchmark.roles: + node = benchmark.rolemap.map(role) + name = role.name - # Parse results.csv into pandas table. Generate a dummy dataframe with a - # single error entry if we are unable to retrieve or parse the csv. + # Compress /tmp/fastpath-share and copy back, then uncompress into + # repeat's log directory. + node.ctx.run( + f""" + cd /tmp + tar -czf fastpath-share-{name}.tar.gz fastpath-share-{name} + """ + ) + node.ctx.get( + f"/tmp/fastpath-share-{name}.tar.gz", + f"{basedir}/fastpath-share-{name}.tar.gz", + ) + subprocess.run( + f""" + cd {basedir} && + tar -xf fastpath-share-{name}.tar.gz && + rm -rf fastpath-share-{name}.tar.gz && + mkdir -p repeat-{repeat} + mv fastpath-share-{name} repeat-{repeat}/role-{name} + """, + shell=True, + check=True, + capture_output=True, + ) + + # Parse results.csv into pandas table. Ignore if not present. + if error == BenchmarkError.NONE: + try: + csv_file = f"{basedir}/repeat-{repeat}/role-{name}/results.csv" + df = pd.read_csv(csv_file) + df = pd.DataFrame(validate_results(df.to_dict("records"))) + df["role"] = role + dfs.append(df) + except BenchmarkException as e: + dfs.append( + pd.DataFrame([{"error": e.error.value, "role": role}]) + ) + except Exception: + pass + + # If there was an error while executing on any node or none of the roles + # produced a result, insert an error. + default_role = benchmark.roles[0] if error != BenchmarkError.NONE: - df = pd.DataFrame([{"error": error.value}]) - else: - try: - df = pd.read_csv(f"{basedir}/repeat-{repeat}/results.csv") - df = pd.DataFrame(validate_results(df.to_dict("records"))) - except BenchmarkException as e: - df = pd.DataFrame([{"error": e.error.value}]) - except Exception: - df = pd.DataFrame([{"error": BenchmarkError.NO_RESULTS_FILE.value}]) - - # Insert all the extra fields we need. - df["benchmark_id"] = ids[Table.BENCHMARK] - df["sut_id"] = ids[Table.SUT] - df["swprofile_id"] = ids[Table.SWPROFILE] - df["session_uuid"] = session_uuid + dfs.append(pd.DataFrame([{"error": error.value, "role": default_role}])) + elif len(dfs) == 0: + dfs.append( + pd.DataFrame( + [ + { + "error": BenchmarkError.NO_RESULTS_FILE.value, + "role": default_role, + } + ] + ) + ) + + # Concat the results and insert all the extra fields we need. + df = pd.concat(dfs) + df["session_uuid"] = suuid df["timestamp"] = datetime.datetime.now() - # Output to RESULTCLASS and RESULT csvs. + results = [] + + # Create ResultClass and Result objects. for _, row in df[df["error"] == 0].iterrows(): - resultclass = dict(row[tables[Table.RESULTCLASS]["cols"]]) - resultclass_id = csvs[Table.RESULTCLASS].append(resultclass) - row["resultclass_id"] = resultclass_id - result = dict(row[tables[Table.RESULT]["cols"]]) - csvs[Table.RESULT].append(result) + resultclass = ResultClass(row) + resultclass.benchmark = benchmark - # Output to ERROR csv. + result = Result(row) + result.resultclass = resultclass + result.sut = sut + result.swprofile = swprofile + result.rolemap = benchmark.rolemap + + results.append(result) + + # Create Error objects. for _, row in df[df["error"] != 0].iterrows(): - err = dict(row[tables[Table.ERROR]["cols"]]) - csvs[Table.ERROR].append(err) + err = Error(row) + err.sut = sut + err.swprofile = swprofile + err.benchmark = benchmark + result.rolemap = benchmark.rolemap - ctx.log(f"END: repeat: {repeat} ({error})\n") + results.append(err) + # Commit to resultstore. + rstore.merge(results) -def do_one_benchmark_exec(pbar, ctx, basedir, csvs, ids, suuid, benchmark): - bname = f"{benchmark['suite']}/{benchmark['name']}" - ctx.log(f"BEGIN: benchmark_exec: {bname}\n") + log.log(f"END: repeat: {repeat} ({error})\n") - cleanup_containers(ctx) - try: - start_container(ctx, benchmark["image"]) +def do_one_benchmark_exec(basedir, sut, swprofile, benchmark, suuid): + bname = f"{benchmark.suite}/{benchmark.name}" + log.log(f"BEGIN: benchmark_exec: {bname}\n") - ids[Table.BENCHMARK] = benchmark["id"] + for_each_parallel([n.ctx for n in sut.nodes], cleanup_containers) - for warmup in range(benchmark["warmups"]): - do_one_repeat(pbar, ctx, None, None, None, benchmark, None, warmup) + rolemap = benchmark.rolemap - for repeat in range(benchmark["repeats"]): - do_one_repeat( - pbar, ctx, basedir, csvs, ids, benchmark, suuid, repeat - ) + try: + for role in benchmark.roles: + start_container(rolemap.map(role).ctx, benchmark.image, role.name) + + for warmup in range(benchmark.planobj["warmups"]): + do_one_repeat(None, sut, swprofile, benchmark, suuid, warmup) + + for repeat in range(benchmark.planobj["repeats"]): + do_one_repeat(basedir, sut, swprofile, benchmark, suuid, repeat) finally: - stop_container(ctx) + for role in benchmark.roles: + stop_container(rolemap.map(role).ctx, role.name) - ctx.log(f"END: benchmark_exec: {bname}\n") + log.log(f"END: benchmark_exec: {bname}\n") -def do_one_session(pbar, ctx, basedir, csvs, ids, benchmarks, session): - ctx.log(f"BEGIN: session: {session}\n") +def do_one_session(basedir, sut, swprofile, benchmarks, session): + log.log(f"BEGIN: session: {session}\n") # Don't reboot for the first session, because do_one_swprofile() has already # rebooted to fingerprint the swprofile. if session > 0: - configure.reboot_configured(ctx) + items = [n.ctx for n in sut.nodes] + for_each_parallel(items, configure.reboot_configured) suuid = uuid.uuid4() name = f"session-{suuid}" - kmsglog = os.path.join(basedir, f"{name}.kmsg") - kmsg.start(ctx, kmsglog) - - for benchmark in benchmarks: - if benchmark["sessions"] <= session: - continue + loggers = [] + try: + for i, node in enumerate(sut.nodes): + kmsglog = os.path.join(basedir, f"{name}-node-{i}.kmsg") + loggers.append(kmsg.Logger(node.ctx, kmsglog)) + loggers[-1].start() - basedir = logs_dir(benchmark["dir"], name) - do_one_benchmark_exec(pbar, ctx, basedir, csvs, ids, suuid, benchmark) + for benchmark in benchmarks: + if benchmark.planobj["sessions"] <= session: + continue - kmsg.stop(ctx) - ctx.log(f"END: session: {session}\n") + basedir = logs_dir(benchmark.dir, name) + do_one_benchmark_exec(basedir, sut, swprofile, benchmark, suuid) + finally: + for logger in loggers: + logger.stop() + log.log(f"END: session: {session}\n") -def do_one_benchmark_setup(pbar, ctx, basedir, csvs, ids, benchmark): - bname = f"{benchmark['suite']}/{benchmark['name']}" - ctx.log(f"BEGIN: benchmark_setup: {bname}\n") - # Grab benchmark and params info and clean for DB. - benchmark_row = dict(benchmark) - param_rows = benchmark_row["params"] - del benchmark_row["params"] - benchmark_row["params_hash"] = fingerprint.hash(param_rows) +def do_one_benchmark_setup(basedir, sut, swprofile, benchmark): + bname = f"{benchmark.suite}/{benchmark.name}" + log.log(f"BEGIN: benchmark_setup: {bname}\n") - # Add benchmark to DB and generate results dir and yaml for benchmark. - benchmark["id"] = csvs[Table.BENCHMARK].append(benchmark_row) + # Generate results dir and yaml for benchmark. + benchmark_dict = benchmark.to_dict(notattrs=["id"]) name = "benchmark-{}-{}-{}".format( - benchmark_row["suite"], - benchmark_row["name"], - fingerprint.hash(benchmark_row)[:8], + benchmark.suite, + benchmark.name, + fingerprint.hash(benchmark_dict)[:8], ) - basedir = logs_dir(basedir, name, benchmark_row, "benchmark.yaml") - benchmark["dir"] = basedir - - # Add params to DB. - for name, value in param_rows.items(): - csvs[Table.PARAM].append( - { - "name": name, - "value": value, - "benchmark_id": benchmark["id"], - } - ) + benchmark.dir = logs_dir(basedir, name, benchmark_dict, "benchmark.yaml") - ctx.log(f"END: benchmark_setup: {bname}\n") + log.log(f"END: benchmark_setup: {bname}\n") -def do_one_swprofile(pbar, ctx, basedir, csvs, ids, swprofile, benchmarks, idx): - ctx.log(f"BEGIN: swprofile: {swprofile['name']}\n") +def do_one_swprofile(basedir, sut, swprofile, benchmarks): + log.log(f"BEGIN: swprofile: {swprofile.name}\n") - # Install the swprofile on sut. swprofile 0 is installed by do_one_sut(). - if idx > 0: - configure.configure(ctx, swprofile) + # Install the swprofile on each node if not already done by do_one_node(). + if not hasattr(swprofile, "configured") or not swprofile.configured: + items = [(n.ctx, swprofile.planobj) for n in sut.nodes] + for_each_parallel(items, configure.configure) - # Grab swprofile and clean for DB. - swprofile_row = fingerprint.sut_query(ctx)["sw"] - swprofile_row["name"] = swprofile["name"] or slugify( - swprofile_row["kernel_name"] - ) - swprofile_row["kernel_git_sha"] = swprofile["gitsha"] or "" - swprofile_row["cmdline"] = "\n".join(swprofile["cmdline"]) - swprofile_row["sysctl"] = "\n".join(swprofile["sysctl"]) - swprofile_row["bootscript"] = "\n".join(swprofile["bootscript"]) + # Finish initializing the swprofile with the sw fingerprint. + swprofile.init_sw_fingerprint(fingerprint.sut_query(sut.nodes[0].ctx)["sw"]) - # Add swprofile to DB and generate results dir and yaml for swprofile. - ids[Table.SWPROFILE] = csvs[Table.SWPROFILE].append(swprofile_row) + # Generate results dir and yaml for swprofile. + swprofile_dict = swprofile.to_dict(notattrs=["id"]) name = "swprofile-{}-{}".format( - swprofile_row["name"], fingerprint.hash(swprofile_row)[:8] + swprofile.name, fingerprint.hash(swprofile_dict)[:8] ) - basedir = logs_dir(basedir, name, swprofile_row, "swprofile.yaml") + basedir = logs_dir(basedir, name, swprofile_dict, "swprofile.yaml") # Setup all the benchmarks. for benchmark in benchmarks: - do_one_benchmark_setup(pbar, ctx, basedir, csvs, ids, benchmark) + do_one_benchmark_setup(basedir, sut, swprofile, benchmark) # Run the max number of sessions required by a benchmark. - for session in range(max(b["sessions"] for b in benchmarks)): - do_one_session(pbar, ctx, basedir, csvs, ids, benchmarks, session) - - ctx.log(f"END: swprofile: {swprofile['name']}\n") - - -def do_one_sut(pbar, ctx, basedir, csvs, ids, sut, swprofiles, benchmarks): - ctx.log(f"BEGIN: sut: {sut['name']}\n") - - # Install the first swprofile on the sut. We do this early (subsequent - # swprofiles are installed in do_one_swprofile()) so that we can safely know - # for sure that the running kernel supports docker and has the capabilities - # required for fingerprinting. - configure.configure(ctx, swprofiles[0]) - - # Pull all the images. This ensures we are not running with a stale version. - # And doing it centrally here ensures every benchmark invocation uses the - # same image. - images = list(set([b["image"] for b in benchmarks])) - for image in images: - ctx.run(f"docker image pull {image}", warn=True) - ctx.run(f"docker image prune --force") - - # Grab sut and cpu info and clean for DB. - sut_row = fingerprint.sut_query(ctx)["hw"] - sut_row["name"] = sut["name"] or slugify(sut_row["host_name"]) - cpu_rows = sut_row["cpu_info"] - del sut_row["cpu_info"] - - # Add sut and cpus to DB. - ids[Table.SUT] = csvs[Table.SUT].append(sut_row) - for cpu_row in cpu_rows: - cpu_index = cpu_row["cpu_index"] - del cpu_row["cpu_index"] - cpu_row = { - "desc": json.dumps(cpu_row), - "cpu_index": cpu_index, - "sut_id": ids[Table.SUT], - } - csvs[Table.CPU].append(cpu_row) + for session in range(max(b.planobj["sessions"] for b in benchmarks)): + do_one_session(basedir, sut, swprofile, benchmarks, session) + + log.log(f"END: swprofile: {swprofile.name}\n") + + +def do_one_sut(basedir, sut, swprofiles, benchmarks): + log.log(f"BEGIN: sut: {sut.name}\n") + + images = list(set([b.image for b in benchmarks])) + + def configure_node(node): + # Install the first swprofile on the node. We do this early (subsequent + # swprofiles are installed in do_one_swprofile()) so that we can safely + # know for sure that the running kernel supports docker and has the + # capabilities required for fingerprinting. + configure.configure(node.ctx, swprofiles[0].planobj) + + # Pull all the images. This ensures we are not running with a stale + # version. And doing it centrally here ensures every benchmark + # invocation uses the same image. + for image in images: + node.ctx.run(f"docker image pull {image}", warn=True) + node.ctx.run(f"docker image prune --force") + + # Finish initializing the node with the hw fingerprint. + node.init_hw_fingerprint(fingerprint.sut_query(node.ctx)["hw"]) + + # Prepare all of the nodes. + for_each_parallel(sut.nodes, configure_node) + swprofiles[0].configured = True # Iterate over the swprofiles. - for idx, swprofile in enumerate(swprofiles): - do_one_swprofile( - pbar, ctx, basedir, csvs, ids, swprofile, benchmarks, idx + for swprofile in swprofiles: + do_one_swprofile(basedir, sut, swprofile, benchmarks) + + log.log(f"END: sut: {sut.name}\n") + + +class PlanObjMixin: + def set_planobj(self, planobj): + self.planobj = planobj + + +class Benchmark(schema.BENCHMARK, PlanObjMixin): + def __init__(self, planobj): + self.set_planobj(planobj) + super().__init__( + suite=planobj["suite"], + name=planobj["name"], + type=planobj["type"], + image=planobj["image"], + params_hash=fingerprint.hash(planobj["params"]), + roles_hash=fingerprint.hash(sorted(planobj["roles"])), + ) + self.params = [Param(n, v) for n, v in planobj["params"].items()] + self.roles = [Role(n) for n in planobj["roles"]] + + +class Cpu(schema.CPU): + def __init__(self, cpu_info): + cpu_info = dict(cpu_info) + cpu_index = cpu_info["cpu_index"] + del cpu_info["cpu_index"] + super().__init__( + desc=json.dumps(cpu_info), + cpu_index=cpu_index, + ) + + +class Error(schema.ERROR): + def __init__(self, row): + error = dict(row[["timestamp", "session_uuid", "error", "role"]]) + super().__init__(**error) + + +class Node(schema.NODE, PlanObjMixin): + def __init__(self, planobj): + self.set_planobj(planobj) + super().__init__(name=planobj["name"]) + + def init_hw_fingerprint(self, hw): + if not self.name: + self.name = slugify(hw["host_name"]) + self.host_name = hw["host_name"] + self.architecture = hw["architecture"] + self.cpu_count = hw["cpu_count"] + self.cpu_info_hash = hw["cpu_info_hash"] + self.numa_count = hw["numa_count"] + self.ram_sz = hw["ram_sz"] + self.hypervisor = hw["hypervisor"] or "" + self.product_name = hw["product_name"] or "" + self.product_serial = hw["product_serial"] or "" + self.mac_addrs_hash = hw["mac_addrs_hash"] + self.cpus = [Cpu(cpu_info) for cpu_info in hw["cpu_info"]] + + # Notify dependent objects that we now have the HW info. + if self.sut: + self.sut.node_init_complete(self) + for rmdesc in self.rmdescs: + rmdesc.rolemap.node_init_complete(self) + + +class Param(schema.PARAM): + def __init__(self, name, value): + super().__init__(name=name, value=value) + + +class Result(schema.RESULT): + def __init__(self, row): + result = dict(row[["timestamp", "session_uuid", "value", "role"]]) + super().__init__(**result) + + +class ResultClass(schema.RESULTCLASS): + def __init__(self, row): + resultclass = dict(row[["name", "unit", "improvement"]]) + super().__init__(**resultclass) + + +class RmDesc(schema.RMDESC): + def __init__(self, role, node): + super().__init__(role=role, node=node) + + +class Role(schema.ROLE): + def __init__(self, name): + super().__init__(name=name) + + +class RoleMap(schema.ROLEMAP, PlanObjMixin): + def __init__(self, planobj, benchmark, sut): + def find_role(name): + for role in benchmark.roles: + if role.name == name: + return role + + self.set_planobj(planobj) + super().__init__() + for rname, nidx in planobj.items(): + self.rmdescs.append(RmDesc(find_role(rname), sut.nodes[nidx])) + + self.rmdescs_to_init = len(planobj.items()) + + def node_init_complete(self, rmdesc): + assert self.rmdescs_to_init > 0 + self.rmdescs_to_init -= 1 + if self.rmdescs_to_init > 0: + return + + # We can only generate rmdescs_hash after all the nodes are initialized, + # since node attributes depend on the HW info. This is quite messy, but + # we need to hash the node and role for each rmdesc uniquely so that we + # can identify the rolemap uniquely and prevent spurious deduplication. + # So create a list of tuples, one for each rmdesc, where each tuple + # contains a hash of the node, a hash of the role and a hash of the + # benchmark (the role is insufficient for uniqueness as role.name could + # be used by multiple benchmarks). Then sort and hash the list for a + # final hash. + hashes = [] + for rmdesc in self.rmdescs: + hashes.append( + fingerprint.hash( + [ + rmdesc.node.hash(notattrs=["id", "sut_id"]), + rmdesc.role.hash(notattrs=["id", "benchmark_id"]), + rmdesc.role.benchmark.hash(notattrs=["id"]), + ] + ) + ) + self.rmdescs_hash = fingerprint.hash(sorted(hashes)) + + def map(self, role): + for rmdesc in self.rmdescs: + if rmdesc.role == role: + return rmdesc.node + + +class Sut(schema.SUT, PlanObjMixin): + def __init__(self, planobj): + self.set_planobj(planobj) + super().__init__(name=planobj["name"]) + self.nodes = [Node(node) for node in planobj["nodes"]] + self.nodes_to_init = len(planobj["nodes"]) + + def node_init_complete(self, node): + assert self.nodes_to_init > 0 + self.nodes_to_init -= 1 + if self.nodes_to_init > 0: + return + + # We can only generate these after all the child nodes are initialized + # as they depend on all the node attributes being populated. + if not self.name: + self.name = "--".join([n.name for n in self.nodes]) + self.nodes_hash = fingerprint.hash( + sorted([n.hash(notattrs=["id", "sut_id"]) for n in self.nodes]) + ) + + +class SwProfile(schema.SWPROFILE, PlanObjMixin): + def __init__(self, planobj): + self.set_planobj(planobj) + super().__init__( + name=planobj["name"], + kernel_git_sha=planobj["gitsha"] or "", + cmdline="\n".join(planobj["cmdline"]), + sysctl="\n".join(planobj["sysctl"]), + bootscript="\n".join(planobj["bootscript"]), ) - ctx.log(f"END: sut: {sut['name']}\n") + def init_sw_fingerprint(self, sw): + if not self.name: + self.name = slugify(sw["kernel_name"]) + self.kernel_name = sw["kernel_name"] + self.kernel_kconfig_full_hash = sw["kernel_kconfig_full_hash"] + self.kernel_cmdline_full_hash = sw["kernel_cmdline_full_hash"] + self.userspace_name = sw["userspace_name"] or "" diff --git a/fastpath/commands/verbs/result/list.py b/fastpath/commands/verbs/result/list.py index ccf130c08e615fe66a28a767a5e4dda98086b9dd..4812587edf5cc59a464ee1ac81e8fb1e88bb80d8 100644 --- a/fastpath/commands/verbs/result/list.py +++ b/fastpath/commands/verbs/result/list.py @@ -19,18 +19,18 @@ value_col_width = None verb_name = os.path.splitext(os.path.basename(__file__))[0] -verb_help = ( - """list all the sut, swprofile or benchmark objects in the result set""" -) -verb_desc = """List all the sut, swprofile or benchmark objects in the result set. - Data are stored as a set of "result" objects, each of the type - described by a "resultclass" object. A "benchmark" object generates - a single result for each of its resultclasses per iteration. A - benchmark is executed on a "sut" (system under test) object with a - given sw profile; a "swprofile" object. This command lists every - object contained within the result set along with their meta data. - Crucially, every object has an ID which is unique within the result - set. IDs are needed when filtering results with the "show" verb.""" +verb_help = """list all the sut, node, swprofile, benchmark or rolemaps objects + in the result set""" +verb_desc = """List all the sut, node, swprofile, benchmark or rolemap objects + in the result set. Data are stored as a set of "result" objects, + each of the type described by a "resultclass" object. A "benchmark" + object generates a single result for each of its resultclasses per + iteration. A benchmark is executed on a "node", which forms part of + a "sut" (system under test) object with a given sw profile; a + "swprofile" object. This command lists every object contained within + the result set along with their meta data. Crucially, every object + has an ID which is unique within the result set. IDs are needed when + filtering results with the "show" verb.""" def add_parser(parser, formatter, add_noun_args): @@ -54,8 +54,9 @@ def add_parser(parser, formatter, add_noun_args): verbp.add_argument( "--object", required=True, - choices=["sut", "swprofile", "benchmark"], - help="""list either "sut", "swprofile", or "benchmark" objects""", + choices=["sut", "node", "swprofile", "benchmark", "rolemap"], + help="""list either "sut", "node", "swprofile", "benchmark", or + "rolemap" objects""", ) verbp.add_argument( @@ -103,10 +104,14 @@ def dispatch(args): if args.object == "sut": objs = pretty_sut(tables, args) + elif args.object == "node": + objs = pretty_node(tables, args) elif args.object == "swprofile": objs = pretty_swprofile(tables, args) elif args.object == "benchmark": objs = pretty_benchmark(tables, args) + elif args.object == "rolemap": + objs = pretty_rolemap(tables, args) output = [] for _, row in objs.iterrows(): @@ -125,17 +130,19 @@ def dispatch(args): cols = { Table.BENCHMARK: { "unique": ("ID", True), - "suite": ("Suite", True), - "name": ("Name", True), + "suite": ("Suite", False), + "name": ("Name", False), "type": ("Type", True), "image": ("Container Image", True), "params": ("Params", True), "params_hash": ("Params Hash", False), + "roles": ("Roles", True), + "roles_hash": ("Roles Hash", False), "resultclass": ("Result Classes", True), }, Table.SWPROFILE: { "unique": ("ID", True), - "name": ("Name", True), + "name": ("Name", False), "kernel_name": ("Kernel Name", True), "kernel_git_sha": ("Kernel Git SHA", True), "kernel_kconfig_full_hash": ("Kernel Kconfig Hash", False), @@ -145,9 +152,10 @@ cols = { "sysctl": ("sysctl", True), "bootscript": ("bootscript", True), }, - Table.SUT: { + Table.NODE: { "unique": ("ID", True), - "name": ("Name", True), + "name": ("Name", False), + "sut": ("Parent SUT", True), "host_name": ("Host Name", True), "architecture": ("Architecture", True), "cpu_count": ("CPU Count", False), @@ -160,6 +168,17 @@ cols = { "product_serial": ("DMI Product Serial", True), "mac_addrs_hash": ("MAC Addresses Hash", False), }, + Table.SUT: { + "unique": ("ID", True), + "name": ("Name", False), + "nodes": ("Node IDs", True), + "nodes_hash": ("Nodes Hash", False), + }, + Table.ROLEMAP: { + "unique": ("ID", True), + "map": ("Map", True), + "rmdescs_hash": ("Rmdescs Hash", False), + }, } @@ -256,6 +275,24 @@ def pretty_sut(tables, args): """ Prepares a data frame of SUTs for display. """ + nodes = tables[Table.NODE] + + def make_nodes(row): + id = row.name + items = nodes[nodes["sut_id"] == id]["unique"] + return "- " + "\n- ".join(sorted(items)) + + objs = tables[Table.SUT] + objs["nodes"] = objs.apply(make_nodes, axis=1) + + return pretty_obj(objs, cols[Table.SUT], args) + + +def pretty_node(tables, args): + """ + Prepares a data frame of NODEs for display. + """ + suts = tables[Table.SUT] cpus = tables[Table.CPU] cpus = cpus.reset_index().drop(["id", "cpu_index"], axis=1) cpus = cpus.groupby(list(cpus.columns)).size().reset_index(name="count") @@ -263,15 +300,19 @@ def pretty_sut(tables, args): def make_cpu_info(row): id = row.name arch = row["architecture"] - descs = cpus[cpus["sut_id"] == id].to_dict("records") + descs = cpus[cpus["node_id"] == id].to_dict("records") strings = [f"{d['count']}x {pretty_cpu_string(d, arch)}" for d in descs] return "\n".join(strings) - objs = tables[Table.SUT] + def make_sut(sut_id): + return suts.loc[sut_id]["unique"] + + objs = tables[Table.NODE] + objs["sut"] = objs["sut_id"].apply(make_sut) objs["ram_sz"] = objs["ram_sz"].apply(pretty_size) objs["cpu_info"] = objs.apply(make_cpu_info, axis=1) - return pretty_obj(objs, cols[Table.SUT], args) + return pretty_obj(objs, cols[Table.NODE], args) def pretty_swprofile(tables, args): @@ -286,6 +327,7 @@ def pretty_benchmark(tables, args): Prepares a data frame of benchmarks for display. """ params = tables[Table.PARAM] + roles = tables[Table.ROLE] rc = tables[Table.RESULTCLASS] def make_params(row): @@ -302,6 +344,11 @@ def pretty_benchmark(tables, args): else: return float("nan") + def make_roles(row): + id = row.name + items = roles[roles["benchmark_id"] == id]["name"] + return "- " + "\n- ".join(sorted(items)) + def make_resultclass(row): id = row.name table = rc[rc["benchmark_id"] == id] @@ -321,6 +368,54 @@ def pretty_benchmark(tables, args): objs = tables[Table.BENCHMARK] objs["params"] = objs.apply(make_params, axis=1) + objs["roles"] = objs.apply(make_roles, axis=1) objs["resultclass"] = objs.apply(make_resultclass, axis=1) return pretty_obj(objs, cols[Table.BENCHMARK], args) + + +def pretty_rolemap(tables, args): + """ + Prepares a data frame of rolemaps for display. + """ + + def make_benchmark_role(row): + return f"{row['benchmark']}/{row['role']}" + + rmdescs = ( + tables[Table.RMDESC] + .join( + tables[Table.ROLE][["benchmark_id", "name"]].rename( + columns={"name": "role"} + ), + on="role_id", + ) + .join( + tables[Table.BENCHMARK]["unique"].rename("benchmark"), + on="benchmark_id", + ) + .join(tables[Table.NODE]["unique"].rename("node"), on="node_id") + ) + rmdescs["benchmark/role"] = rmdescs.apply(make_benchmark_role, axis=1) + rmdescs["->"] = "->" + + def make_map(row): + id = row.name + table = rmdescs[rmdescs["rolemap_id"] == id] + table = table[["benchmark/role", "->", "node"]] + table = table.sort_values( + ["benchmark/role"], key=lambda x: ns.natsort_key(x) + ) + if not args.ascii: + table.columns = [term.make_dim(col) for col in table.columns] + table = table.to_dict("records") + return tabulate.tabulate( + table, + headers="keys", + tablefmt="simple" if args.ascii else "plain", + ) + + objs = tables[Table.ROLEMAP] + objs["map"] = objs.apply(make_map, axis=1) + + return pretty_obj(objs, cols[Table.ROLEMAP], args) diff --git a/fastpath/commands/verbs/result/merge.py b/fastpath/commands/verbs/result/merge.py index 3817d286cdcd296a609d9cf28d4c912f0dc69388..1555395791994b9e9501daab54ce619f76c78624 100644 --- a/fastpath/commands/verbs/result/merge.py +++ b/fastpath/commands/verbs/result/merge.py @@ -106,31 +106,13 @@ def dispatch(args): if len(set(srcs + [dst])) != len(srcs + [dst]): raise Exception("source and destination resulstores must be unique") - if rs.exists(dst) and not args.append: - raise Exception("destination resultstore exists: --append required") - - if rs.exists(dst) and not rs.conforms(dst): - raise Exception("destination resultstore currupt: can't continue") - # Get source stores. src_stores = [] for src in srcs: - if rs.is_csv(src): - src_stores.append(rs.ResultSet.from_csv(src)) - else: - src_stores.append(rs.ResultSet.open(src)) + src_stores.append(rs.open_or_import(src)) # Get destination store. - if rs.is_csv(dst): - if rs.conforms(dst): - dst_store = rs.ResultSet.from_csv(dst) - else: - dst_store = rs.ResultSet.create() - else: - if rs.conforms(dst): - dst_store = rs.ResultSet.open(dst) - else: - dst_store = rs.ResultSet.create(dst) + dst_store = rs.create_open_or_import(dst, args.append) # Merge each source into the destination. for src_store in src_stores: diff --git a/fastpath/commands/verbs/result/serve.py b/fastpath/commands/verbs/result/serve.py index 58655c530dcd852519395577b3506e985c646b64..31a1810cceecb4207c3ac21d205299f597bdb72c 100644 --- a/fastpath/commands/verbs/result/serve.py +++ b/fastpath/commands/verbs/result/serve.py @@ -5,7 +5,6 @@ import os import runpy import sys from fastpath.commands import cliutils -from fastpath.utils import resultstore as rs from fastpath import dashboard diff --git a/fastpath/commands/verbs/result/show.py b/fastpath/commands/verbs/result/show.py index 25df47de58fa8b0b2eb911f76dec194dc15935b9..6c4359214b8bce756be17e580c54d32e750b991a 100644 --- a/fastpath/commands/verbs/result/show.py +++ b/fastpath/commands/verbs/result/show.py @@ -2,6 +2,7 @@ # SPDX-License-Identifier: MIT import io +import math import natsort as ns import numpy as np import os @@ -142,6 +143,14 @@ def add_parser(parser, formatter, add_noun_args): help="""don't merge similar objects""", ) + verbp.add_argument( + "--no-merge-rolemap", + required=False, + default=False, + action="store_true", + help="""don't merge results generated with different rolemaps""", + ) + return verb_name @@ -151,6 +160,7 @@ def dispatch(args): subcommand, with the arguments the user passed on the command line. The arguments comply with those requested in add_parser(). """ + init_pivot_index(not args.no_merge_rolemap) tables = load_tables(args.resultstore, not args.no_merge_similar) results = join_results(tables) @@ -217,6 +227,14 @@ pivot_index = ["benchmark", "resultclass", "unit", "improvement"] leading_cols = len(pivot_index) +def init_pivot_index(merge_rolemap): + if not merge_rolemap: + global pivot_index + global leading_cols + pivot_index.insert(1, "rolemap") + leading_cols = len(pivot_index) + + def pivot_results(df, suts, swprofiles): def my_agg(group): values = group["value"] @@ -335,10 +353,24 @@ def compute_change(results, args): return results -def pretty_results_multi(results, args): - def format(cur, prev): - return f"{cur / prev - 1.0:.2%}" if prev else f"{cur:.2f}" +def format(cur, prev=None): + if prev: + return f"{cur / prev - 1.0:.2%}" + elif cur == 0: + return "0.00" + elif np.isnan(cur): + return f"{cur:.2f}" + else: + # Ensure at least 2 significant figures, defaulting to 2 decimal + # places otherwise. + digits = 2 - int(math.floor(math.log10(abs(cur)))) - 1 + if digits <= 2: + return f"{cur:.2f}" + else: + return f"{round(cur, digits):.{digits}f}" + +def pretty_results_multi(results, args): def mark_improvement(value): if args.ascii: return f"(I) {value}" @@ -356,7 +388,9 @@ def pretty_results_multi(results, args): rows, cols = mean.shape pretty = mean.copy() - for col in pretty.columns[leading_cols:]: + fcol = pretty.columns[leading_cols] + pretty[fcol] = pretty[fcol].apply(lambda x: format(x)) + for col in pretty.columns[leading_cols + 1 :]: pretty[col] = pretty[col].astype(str) for col in range(leading_cols + 1, cols): @@ -402,13 +436,12 @@ def pretty_results_multi(results, args): raise Exception("No results to display after filtering") pretty = pretty.reset_index(drop=True) - pretty = pretty.drop(["improvement", "I", "R"], axis=1) + pretty = pretty.drop(["I", "R"], axis=1) return pretty_results(pretty, args) def pretty_results_single(results, args): results = results.reset_index() - results = results.drop(["improvement"], axis=1) results["count"] = results["count"].astype(np.int64) if args.relative: @@ -417,11 +450,11 @@ def pretty_results_single(results, args): results[col] = results[col].apply(lambda x: f"{x - 1.0:.2%}") results["stddev"] = results["stddev"] / results["mean"] results["stddev"] = results["stddev"].apply(lambda x: f"{x:.2%}") - results["mean"] = results["mean"].apply(lambda x: f"{x:.2f}") + results["mean"] = results["mean"].apply(lambda x: format(x)) results = results.rename({"stddev": f"cv"}, axis=1) else: for col in ["min", "ci95min", "mean", "ci95max", "max", "stddev"]: - results[col] = results[col].apply(lambda x: f"{x:.2f}") + results[col] = results[col].apply(lambda x: format(x)) return pretty_results(results, args) @@ -438,7 +471,6 @@ def pretty_results(results, args): results["resultclass"] = results.apply( lambda row: f"{row['resultclass']} ({row['unit']})", axis=1 ) - results = results.drop(["unit"], axis=1) # Replace nans with empty space. results = results.replace( @@ -451,9 +483,8 @@ def pretty_results(results, args): # Sort results naturally by benchmark then resultclass; essentially sort # text alphabetically and numbers numberically. - results = results.sort_values( - ["benchmark", "resultclass"], key=lambda x: ns.natsort_key(x) - ) + results = results.sort_values(pivot_index, key=lambda x: ns.natsort_key(x)) + results = results.drop(["unit", "improvement"], axis=1) results = results.reset_index(drop=True) # We will insert a separator after each benchmark (which may have multiple @@ -471,13 +502,18 @@ def pretty_results(results, args): results = results.rename( { "benchmark": "Benchmark", + "rolemap": "Role Map", "resultclass": "Result Class", }, axis=1, ) if not args.ascii: results.columns = [term.make_dim(col) for col in results.columns] - colalign = ["left", "left"] + ["right"] * (len(results.columns) - 2) + stat_align = ["right"] * (len(results.columns) - len(pivot_index) + 2) + index_align = ["left", "left"] + if args.no_merge_rolemap: + index_align.insert(1, "right") + colalign = index_align + stat_align # Tabulate then manually insert the separators at the indexes we previously # calculated. While tabulate claims support for doing separators itself, it @@ -490,7 +526,7 @@ def pretty_results(results, args): table, tablefmt="outline" if args.ascii else "rounded_outline", headers="keys", - floatfmt=".2f", + disable_numparse=True, colalign=colalign, ) table = table.split("\n") diff --git a/fastpath/utils/kmsg.py b/fastpath/utils/kmsg.py index cf9ebd9e5379935d96b12ce75017fecac665e87a..bceede7eabf9e4bf378110c2ec0e7eeb866906d4 100644 --- a/fastpath/utils/kmsg.py +++ b/fastpath/utils/kmsg.py @@ -48,75 +48,72 @@ def _stop_netconsole(ctx): NETCONSOLE_PORT = 6666 -_log_thread = None -_stop_event = threading.Event() -_sock = None -def start(ctx, filename): - """ - Configure netconsole on the sut to send kernel logs over UDP then receive - them and write them to filename in a background thread. - """ - global _log_thread - global _stop_event - global _sock - - if _log_thread: - raise RuntimeError("kmsg logger already started") - - _stop_event.clear() - _sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - - try: - _sock.bind(("0.0.0.0", NETCONSOLE_PORT)) - _sock.settimeout(0.25) - - _start_netconsole(ctx) - - with open(filename, "a") as f: - t = str(datetime.datetime.now()) - print("\n", file=f) - print(f"### kmsg replay ###", file=f) - f.write(ctx.sudo("dmesg").stdout) - print(f"### {t}: kmsg live ###", file=f) - - def log(): - with open(filename, "ab") as f: - while not _stop_event.is_set(): - try: - data, _ = _sock.recvfrom(4096) - f.write(data) - f.flush() - except socket.timeout: - continue - except Exception as e: - print(f"### kmsg log error ({e}) ###", file=f) - break - - _log_thread = threading.Thread(target=log, daemon=True) - _log_thread.start() - except Exception as e: - with open(filename, "a") as f: - print(f"### kmsg unable to configure logging ({e}) ###", file=f) - - -def stop(ctx): - """ - Stop logging kernel messages over UDP. - """ - global _log_thread - global _stop_event - global _sock - - if _log_thread is None: - return - - _stop_event.set() - _log_thread.join() - _log_thread = None - if _sock: - _sock.close() - _sock = None - - _stop_netconsole(ctx) +class Logger: + def __init__(self, ctx, filename): + self.log_thread = None + self.stop_event = None + self.sock = None + self.ctx = ctx + self.filename = filename + + def start(self): + """ + Configure netconsole on the sut to send kernel logs over UDP then receive + them and write them to filename in a background thread. + """ + if self.log_thread: + raise RuntimeError("kmsg logger already started") + + self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + + try: + self.sock.bind(("0.0.0.0", NETCONSOLE_PORT)) + self.sock.settimeout(0.25) + + _start_netconsole(self.ctx) + + with open(self.filename, "a") as f: + t = str(datetime.datetime.now()) + print("\n", file=f) + print(f"### kmsg replay ###", file=f) + f.write(self.ctx.sudo("dmesg").stdout) + print(f"### {t}: kmsg live ###", file=f) + + def log(): + with open(self.filename, "ab") as f: + while not self.stop_event.is_set(): + try: + data, _ = self.sock.recvfrom(4096) + f.write(data) + f.flush() + except socket.timeout: + continue + except Exception as e: + print(f"### kmsg log error ({e}) ###", file=f) + break + + self.stop_event = threading.Event() + self.log_thread = threading.Thread(target=log, daemon=True) + self.log_thread.start() + except Exception as e: + with open(self.filename, "a") as f: + print(f"### kmsg unable to configure logging ({e}) ###", file=f) + + def stop(self): + """ + Stop logging kernel messages over UDP. + """ + if self.log_thread is None: + return + + self.stop_event.set() + self.log_thread.join() + self.log_thread = None + self.stop_event = None + if self.sock: + self.sock.close() + self.sock = None + + _stop_netconsole(self.ctx) diff --git a/fastpath/utils/logger.py b/fastpath/utils/logger.py new file mode 100644 index 0000000000000000000000000000000000000000..68ec462236971ece74ca41b20c37fabd9666e576 --- /dev/null +++ b/fastpath/utils/logger.py @@ -0,0 +1,28 @@ +# Copyright (c) 2025, Arm Limited. +# SPDX-License-Identifier: MIT + + +import datetime + + +class Logger: + def __init__(self, logfile=None): + self.set_logfile(logfile) + + def set_logfile(self, logfile): + self.logfile = logfile + + def log(self, msg): + if not self.logfile or len(msg) == 0: + return + + t = str(datetime.datetime.now()) + + lines = msg.split("\n") + assert lines[-1] == "" + lines = [f"{t}: {l}" for l in lines[:-1]] + lines.append("") + + msg = "\n".join(lines) + self.logfile.write(msg) + self.logfile.flush() diff --git a/fastpath/utils/machine.py b/fastpath/utils/machine.py index c49141b0bf80cdbec6134196ed7699366b362946..dfe3f03c7997f18b4960fa4abe545e9027eb669a 100644 --- a/fastpath/utils/machine.py +++ b/fastpath/utils/machine.py @@ -2,36 +2,41 @@ # SPDX-License-Identifier: MIT -import datetime import fabric import invoke import io import logging import textwrap import time -import sys +import threading -def _log(log, msg): +# Pamamico, used by fabric, sprays error info to stderr when throwing an +# exception. This happens a lot for the reboot() case, so let's suppress +# it. +logging.getLogger("paramiko").setLevel(logging.CRITICAL) + + +def _log(logfile, msg): if len(msg) == 0: return - t = str(datetime.datetime.now()) + label = logfile["label"] + logger = logfile["logger"] lines = msg.split("\n") assert lines[-1] == "" - lines = [f"{t}: {l}" for l in lines[:-1]] + lines = [f"{label}: {l}" for l in lines[:-1]] lines.append("") msg = "\n".join(lines) - log.write(msg) - log.flush() + logger.log(msg) -def _write_label(log, label): +def _write_label(logfile, label): label = f"# {label} " label = label + "-" * max(80 - len(label), 0) + "\n" - _log(log, label) + _log(logfile, label) def _fixup_newline(msg): @@ -41,36 +46,46 @@ def _fixup_newline(msg): class SSHMachine: - def __init__(self, log, params): + def __init__(self, logger=None, params=None, parent=None): + assert (logger != None and params != None) ^ (parent != None) + + self.ip = None self.trans_nr = 0 - self.logfile = log - - self.connect_args = { - "host": params["host"], - "user": params["user"], - "port": params["port"], - } - if params["keyfile"]: - self.connect_args["connect_kwargs"] = { - "key_filename": params["keyfile"] + self.subcons = 0 + self.subcons_lock = threading.Lock() + + if parent: + with parent.subcons_lock: + subcon = parent.subcons + parent.subcons += 1 + label = f"{parent.logfile['label']} subcon {subcon}" + self.logfile = {"label": label, "logger": parent.logfile["logger"]} + self.connect_args = parent.connect_args + else: + self.logfile = {"label": params["host"], "logger": logger} + + self.connect_args = { + "host": params["host"], + "user": params["user"], + "port": params["port"], } + if params["keyfile"]: + self.connect_args["connect_kwargs"] = { + "key_filename": params["keyfile"] + } self.connect() def connect(self, timeout=None): self.log(f"Connecting to {self.connect_args}...\n") - # Pamamico, used by fabric, sprays error info to stderr when throwing an - # exception. This happens a lot for the reboot() case, so let's suppress - # it. - logging.getLogger("paramiko").setLevel(logging.CRITICAL) - self.remote = fabric.Connection( **self.connect_args, connect_timeout=timeout ) try: self.remote.open() self.remote.transport.set_keepalive(15) + self.ip = self.remote.transport.sock.getpeername()[0] except Exception: del self.remote self.remote = None @@ -84,6 +99,7 @@ class SSHMachine: self.remote.close() del self.remote self.remote = None + self.ip = None self.log(f"Disconnected from {self.connect_args}\n") def close(self): @@ -238,6 +254,18 @@ class SSHMachine: _write_label(self.logfile, f"end reboot") + def ip_addr(self): + return self.ip + + def subconnection(self): + """ + Returns a new SSHMachine instance that has it's own connection to the + remote machine. This allows for running parallel commands on the machine + in different threads. reboot() is only permitted by the root connection + which must have no active subconnections at the time. + """ + return SSHMachine(parent=self) + def open(log, method, params): # Only support SSH method currently. diff --git a/fastpath/utils/plan.py b/fastpath/utils/plan.py index b780afdee0d6ee432ce41be383f01e7eab8fd103..0cc6d08b17831e0c9a870c85ca033d072a2525e5 100644 --- a/fastpath/utils/plan.py +++ b/fastpath/utils/plan.py @@ -9,35 +9,6 @@ import yaml from fastpath.utils import workspace -_sut_params_schema = { - "SSH": { - "host": { - "type": "string", - "required": True, - }, - "user": { - "type": "string", - "required": False, - "nullable": True, - "default": None, - }, - "port": { - "type": "integer", - "required": False, - "nullable": True, - "default": None, - "coerce": int, - }, - "keyfile": { - "type": "string", - "required": False, - "nullable": True, - "default": None, - }, - }, -} - - _plan_pre_schema = { "defaults": { "type": "dict", @@ -88,18 +59,61 @@ _plan_pre_schema = { "default": None, "regex": r"[a-zA-Z_][a-zA-Z0-9_\-\.]*", }, - "connection": { - "type": "dict", + "nodes": { + "type": "list", "required": True, + "minlength": 1, "schema": { - "method": { - "type": "string", - "required": True, - "allowed": ["SSH"], - }, - "params": { - "type": "dict", - "required": True, + "type": "dict", + "required": True, + "schema": { + "name": { + "type": "string", + "required": False, + "nullable": True, + "default": None, + "regex": r"[a-zA-Z_][a-zA-Z0-9_\-\.]*", + }, + "connection": { + "type": "dict", + "required": True, + "schema": { + "method": { + "type": "string", + "required": True, + "allowed": ["SSH"], + }, + "params": { + "type": "dict", + "required": True, + "schema": { + "host": { + "type": "string", + "required": True, + }, + "user": { + "type": "string", + "required": False, + "nullable": True, + "default": None, + }, + "port": { + "type": "integer", + "required": False, + "nullable": True, + "default": None, + "coerce": int, + }, + "keyfile": { + "type": "string", + "required": False, + "nullable": True, + "default": None, + }, + }, + }, + }, + }, }, }, }, @@ -108,6 +122,7 @@ _plan_pre_schema = { "swprofiles": { "type": "list", "required": True, + "minlength": 1, "schema": { "type": "dict", "required": True, @@ -174,6 +189,7 @@ _plan_pre_schema = { "benchmarks": { "type": "list", "required": True, + "minlength": 1, }, } @@ -237,6 +253,25 @@ _plan_post_schema["benchmarks"]["schema"] = { "regex": r"\d+[sSmMhHdD]", "required": True, }, + "roles": { + "type": "list", + "required": False, + "minlength": 1, + "unique": True, + "default": ["executer"], + "schema": { + "type": "string", + "regex": r"[a-zA-Z_][a-zA-Z0-9_\-\.]*", + }, + }, + "rolemap": { + "type": "dict", + "required": False, + "default": {}, + "valuesrules": { + "type": ["string", "integer"], + }, + }, }, } @@ -267,6 +302,17 @@ class ValidationError(Exception): pass +def _pre_normalize(plan): + # If we have sut.connection and no nodes, then we have a single-node sut. + # Create a single node with the connection. + sut = plan.get("sut") + if sut and "connection" in sut and "nodes" not in sut: + sut["nodes"] = [{"connection": sut["connection"]}] + del sut["connection"] + + return plan + + def _validate_normalize(structure, schema): """ Validate that structure conforms to schema, applying any normalization @@ -299,7 +345,18 @@ def _validate_normalize(structure, schema): return messages - v = cerberus.Validator(schema) + class PlanValidator(cerberus.Validator): + def _validate_unique(self, unique, field, value): + """ + Enforces that all items in a list are unique. + The rule's arguments are validated against this schema: + {'type': 'boolean'} + """ + if unique: + if len(value) != len(set(value)): + self._error(field, "items must be unique") + + v = PlanValidator(schema) structure = v.validated(structure) if not structure: raise ValidationError("\n".join(format_validation_errors(v.errors))) @@ -320,25 +377,30 @@ def _defaults_sort(defaults): return dict(sorted(defaults.items(), key=lambda x: lut[x[0]])) -def _sut_sort(sut): - lut = list( - _plan_post_schema["sut"]["schema"]["connection"]["schema"]["params"][ - "schema" - ].keys() - ) +def _node_sort(node): + schema = _plan_post_schema["sut"]["schema"]["nodes"]["schema"]["schema"] + + lut = list(schema["connection"]["schema"]["params"]["schema"].keys()) lut = {k: i for i, k in enumerate(lut)} - sut["connection"]["params"] = dict( - sorted(sut["connection"]["params"].items(), key=lambda x: lut[x[0]]) + node["connection"]["params"] = dict( + sorted(node["connection"]["params"].items(), key=lambda x: lut[x[0]]) ) - lut = list( - _plan_post_schema["sut"]["schema"]["connection"]["schema"].keys() - ) + lut = list(schema["connection"]["schema"].keys()) lut = {k: i for i, k in enumerate(lut)} - sut["connection"] = dict( - sorted(sut["connection"].items(), key=lambda x: lut[x[0]]) + node["connection"] = dict( + sorted(node["connection"].items(), key=lambda x: lut[x[0]]) ) + lut = list(schema.keys()) + lut = {k: i for i, k in enumerate(lut)} + return dict(sorted(node.items(), key=lambda x: lut[x[0]])) + + +def _sut_sort(sut): + for i, n in enumerate(sut["nodes"]): + sut["nodes"][i] = _node_sort(n) + lut = list(_plan_post_schema["sut"]["schema"].keys()) lut = {k: i for i, k in enumerate(lut)} return dict(sorted(sut.items(), key=lambda x: lut[x[0]])) @@ -425,17 +487,7 @@ def load(file_name): plan = yaml.safe_load(file) rel = os.path.dirname(file_name) - plan = _validate_normalize(plan, _plan_pre_schema) - - # Once we know the sut method, attach the correct method params - # schema and re-validate. - _plan_pre_schema["sut"]["schema"]["connection"]["schema"]["params"][ - "schema" - ] = _sut_params_schema[plan["sut"]["connection"]["method"]] - _plan_post_schema["sut"]["schema"]["connection"]["schema"]["params"][ - "schema" - ] = _sut_params_schema[plan["sut"]["connection"]["method"]] - + plan = _pre_normalize(plan) plan = _validate_normalize(plan, _plan_pre_schema) for i, bm in enumerate(plan["benchmarks"]): @@ -447,6 +499,22 @@ def load(file_name): plan["benchmarks"][i] = _merge(base, bm) plan = _validate_normalize(plan, _plan_post_schema) + + # Check rolemap keys are all in roles list and fill in any blanks, + # defaulting to node 0. + for i, bm in enumerate(plan["benchmarks"]): + valid_keys = set(bm["roles"]) + keys = set(bm["rolemap"].keys()) + invalid_keys = keys - valid_keys + if invalid_keys: + raise ValidationError( + f"benchmarks[{i}].rolemap: " + f"{list(invalid_keys)[0]} is not a valid role" + ) + else: + missing_roles = valid_keys - keys + bm["rolemap"].update({m: 0 for m in missing_roles}) + plan = _plan_sort(plan) return plan diff --git a/fastpath/utils/resultstore.py b/fastpath/utils/resultstore.py index 95ee6a670ee959479a549ca5eaf26901847b3d5c..b13d9ae7c9c9449fe7f35e3eb2d55c25cf511da3 100644 --- a/fastpath/utils/resultstore.py +++ b/fastpath/utils/resultstore.py @@ -19,10 +19,14 @@ class Table(enum.Enum): SWPROFILE = (2,) CPU = (3,) ERROR = (4,) - PARAM = (5,) - RESULT = (6,) - RESULTCLASS = (7,) - SUT = (8,) + NODE = (5,) + PARAM = (6,) + RESULT = (7,) + RMDESC = (8,) + ROLE = (9,) + ROLEMAP = (10,) + RESULTCLASS = (11,) + SUT = (12,) def is_csv(url): @@ -90,6 +94,45 @@ def conforms(url): return _conforms(db) +def open_or_import(url): + """ + Opens an existing resultstore (mysql, sqlite) or import an existing + resultstore (csv). If csv, then the user must explicitly export it on + completion if required. + """ + if not exists(url): + raise Exception("resultstore does not exist") + + if not conforms(url): + raise Exception("resultstore currupt") + + if is_csv(url): + rstore = ResultSet.from_csv(url) + else: + rstore = ResultSet.open(url) + + return rstore + + +def create_open_or_import(url, allow_existing): + """ + Creates a resultstore if it doesn't already exist, or if it does exist and + allow_existing=True, open it (mysql, sqlite) or import it (csv). If csv, + then the user must explicitly export it on completion if required. + """ + if exists(url) and not allow_existing: + raise Exception("resultstore exists: --append required") + + if exists(url): + rstore = open_or_import(url) + elif is_csv(url): + rstore = ResultSet.create() + else: + rstore = ResultSet.create(url) + + return rstore + + class ResultSet: @classmethod def open(cls, url): @@ -187,13 +230,6 @@ class ResultSet: datetime.datetime.fromisoformat ) - # HACK: We have a lot of historical CSV data with "summary" column in - # the RESULTCLASS table. The column has since been removed, but to - # ensure we can continue to load the data, remove the column if present - # during load. Drop this hack once we move to mysql. - if "summary" in dfs[Table.RESULTCLASS].columns: - dfs[Table.RESULTCLASS].drop(["summary"], axis=1, inplace=True) - return cls.from_dfs(dfs) def __init__(self, db): @@ -287,8 +323,8 @@ class ResultSet: returned from query_results() or query_errors(). Currently only support passing RESULT and ERROR objects. """ - errors = [o for o in objects if type(o) == schema.ERROR] - results = [o for o in objects if type(o) == schema.RESULT] + errors = [o for o in objects if isinstance(o, schema.ERROR)] + results = [o for o in objects if isinstance(o, schema.RESULT)] if len(errors) + len(results) != len(objects): raise Exception("merge only supports for ERROR and RESULT objects") @@ -296,174 +332,261 @@ class ResultSet: # Deduplicated set of source entities. sents = {table: set() for table in Table} + def gather_error(error): + if hasattr(error, "gathered"): + return + error.gathered = True + + sents[Table.ERROR].add(error) + gather_sut(error.sut) + gather_swprofile(error.swprofile) + gather_benchmark(error.benchmark) + gather_rolemap(error.rolemap) + gather_role(error.role) + + def gather_benchmark(benchmark): + if hasattr(benchmark, "gathered"): + return + benchmark.gathered = True + + sents[Table.BENCHMARK].add(benchmark) + for param in benchmark.params: + gather_param(param) + for role in benchmark.roles: + gather_role(role) + + def gather_cpu(cpu): + if hasattr(cpu, "gathered"): + return + cpu.gathered = True + + sents[Table.CPU].add(cpu) + + def gather_node(node): + if hasattr(node, "gathered"): + return + node.gathered = True + + sents[Table.NODE].add(node) + for cpu in node.cpus: + gather_cpu(cpu) + + def gather_param(param): + if hasattr(param, "gathered"): + return + param.gathered = True + + sents[Table.PARAM].add(param) + + def gather_result(result): + if hasattr(result, "gathered"): + return + result.gathered = True + + sents[Table.RESULT].add(result) + gather_resultclass(result.resultclass) + gather_sut(result.sut) + gather_swprofile(result.swprofile) + gather_rolemap(result.rolemap) + gather_role(result.role) + + def gather_resultclass(resultclass): + if hasattr(resultclass, "gathered"): + return + resultclass.gathered = True + + sents[Table.RESULTCLASS].add(resultclass) + gather_benchmark(resultclass.benchmark) + + def gather_rmdesc(rmdesc): + if hasattr(rmdesc, "gathered"): + return + rmdesc.gathered = True + + sents[Table.RMDESC].add(rmdesc) + gather_role(rmdesc.role) + gather_node(rmdesc.node) + + def gather_role(role): + if hasattr(role, "gathered"): + return + role.gathered = True + + sents[Table.ROLE].add(role) + + def gather_rolemap(rolemap): + if hasattr(rolemap, "gathered"): + return + rolemap.gathered = True + + sents[Table.ROLEMAP].add(rolemap) + for rmdesc in rolemap.rmdescs: + gather_rmdesc(rmdesc) + + def gather_sut(sut): + if hasattr(sut, "gathered"): + return + sut.gathered = True + + sents[Table.SUT].add(sut) + for node in sut.nodes: + gather_node(node) + + def gather_swprofile(swprofile): + if hasattr(swprofile, "gathered"): + return + swprofile.gathered = True + + sents[Table.SWPROFILE].add(swprofile) + # Gather all the source entities related to the errors. for error in errors: - sents[Table.ERROR].add(error) - sents[Table.SUT].add(error.sut) - sents[Table.CPU].update(error.sut.cpus) - sents[Table.SWPROFILE].add(error.swprofile) - sents[Table.BENCHMARK].add(error.benchmark) - sents[Table.PARAM].update(error.benchmark.params) + gather_error(error) # Gather all the source entities related to the results. for result in results: - sents[Table.RESULT].add(result) - sents[Table.SUT].add(result.sut) - sents[Table.CPU].update(result.sut.cpus) - sents[Table.SWPROFILE].add(result.swprofile) - sents[Table.RESULTCLASS].add(result.resultclass) - sents[Table.BENCHMARK].add(result.resultclass.benchmark) - sents[Table.PARAM].update(result.resultclass.benchmark.params) + gather_result(result) + + # Delete all the temporary gathered attributes we added as markers to + # prevent unneccessary recursion during gathering. + for table in Table: + for sent in sents[table]: + del sent.gathered + + def inspect(sent): + model = type(sent) + while schema.BaseTable != model.__base__: + model = model.__base__ + columns = [ + c.name + for c in model.__table__.columns + if not c.primary_key and not c.foreign_keys + ] + attrs = sent.to_dict(columns) if columns else {} + return model, attrs def create(model, **kwargs): entity = model(**kwargs) self.session.add(entity) - self.session.flush() return entity - def get_or_create(model, **kwargs): - entity = self.session.query(model).filter_by(**kwargs).first() - if not entity: - entity = create(model, **kwargs) - return entity + def create_from(sent, **kwargs): + model, attrs = inspect(sent) + return create(model, **attrs, **kwargs) + + def get_or_create_from(sent, **kwargs): + model, attrs = inspect(sent) + dent = ( + self.session.query(model).filter_by(**attrs, **kwargs).first() + ) + if not dent: + dent = create(model, **attrs, **kwargs) + return dent - # Map from source id to destination id. - ids = {table: {} for table in Table} + # Map from source entity to destination entity. + map = {table: {} for table in Table} # Merge SUT tables. for sent in sents[Table.SUT]: - dent = get_or_create( - schema.SUT, - **sent.to_dict( - attrs=[ - "name", - "host_name", - "architecture", - "cpu_count", - "cpu_info_hash", - "numa_count", - "ram_sz", - "hypervisor", - "product_name", - "product_serial", - "mac_addrs_hash", - ] - ), + dent = get_or_create_from( + sent, + ) + map[Table.SUT][sent] = dent + + # Merge NODE tables. + for sent in sents[Table.NODE]: + dent = get_or_create_from( + sent, + sut=map[Table.SUT][sent.sut], ) - ids[Table.SUT][sent.id] = dent.id + map[Table.NODE][sent] = dent # Merge CPU tables. for sent in sents[Table.CPU]: - dent = get_or_create( - schema.CPU, - **sent.to_dict( - attrs=[ - "desc", - "cpu_index", - ] - ), - sut_id=ids[Table.SUT][sent.sut_id], + dent = get_or_create_from( + sent, + node=map[Table.NODE][sent.node], ) - ids[Table.CPU][sent.id] = dent.id + map[Table.CPU][sent] = dent # Merge SWPROFILE tables. for sent in sents[Table.SWPROFILE]: - dent = get_or_create( - schema.SWPROFILE, - **sent.to_dict( - attrs=[ - "name", - "kernel_name", - "kernel_git_sha", - "kernel_kconfig_full_hash", - "kernel_cmdline_full_hash", - "userspace_name", - "cmdline", - "sysctl", - "bootscript", - ] - ), + dent = get_or_create_from( + sent, ) - ids[Table.SWPROFILE][sent.id] = dent.id + map[Table.SWPROFILE][sent] = dent # Merge BENCHMARK tables. for sent in sents[Table.BENCHMARK]: - dent = get_or_create( - schema.BENCHMARK, - **sent.to_dict( - attrs=[ - "suite", - "name", - "type", - "image", - "params_hash", - ] - ), + dent = get_or_create_from( + sent, ) - ids[Table.BENCHMARK][sent.id] = dent.id + map[Table.BENCHMARK][sent] = dent # Merge PARAM tables. for sent in sents[Table.PARAM]: - dent = get_or_create( - schema.PARAM, - **sent.to_dict( - attrs=[ - "name", - "value", - ] - ), - benchmark_id=ids[Table.BENCHMARK][sent.benchmark_id], + dent = get_or_create_from( + sent, + benchmark=map[Table.BENCHMARK][sent.benchmark], ) - ids[Table.PARAM][sent.id] = dent.id + map[Table.PARAM][sent] = dent + + # Merge ROLE tables. + for sent in sents[Table.ROLE]: + dent = get_or_create_from( + sent, + benchmark=map[Table.BENCHMARK][sent.benchmark], + ) + map[Table.ROLE][sent] = dent # Merge RESULTCLASS tables. for sent in sents[Table.RESULTCLASS]: - dent = get_or_create( - schema.RESULTCLASS, - **sent.to_dict( - attrs=[ - "name", - "unit", - "improvement", - ] - ), - benchmark_id=ids[Table.BENCHMARK][sent.benchmark_id], + dent = get_or_create_from( + sent, + benchmark=map[Table.BENCHMARK][sent.benchmark], + ) + map[Table.RESULTCLASS][sent] = dent + + # Merge ROLEMAP tables. + for sent in sents[Table.ROLEMAP]: + dent = get_or_create_from( + sent, + ) + map[Table.ROLEMAP][sent] = dent + + # Merge RMDESC tables. + for sent in sents[Table.RMDESC]: + dent = get_or_create_from( + sent, + rolemap=map[Table.ROLEMAP][sent.rolemap], + role=map[Table.ROLE][sent.role], + node=map[Table.NODE][sent.node], ) - ids[Table.RESULTCLASS][sent.id] = dent.id + map[Table.RMDESC][sent] = dent # Merge RESULT tables. for sent in sents[Table.RESULT]: - dent = create( - schema.RESULT, - **sent.to_dict( - attrs=[ - "timestamp", - "session_uuid", - "value", - ] - ), - resultclass_id=ids[Table.RESULTCLASS][sent.resultclass_id], - sut_id=ids[Table.SUT][sent.sut_id], - swprofile_id=ids[Table.SWPROFILE][sent.swprofile_id], + dent = create_from( + sent, + resultclass=map[Table.RESULTCLASS][sent.resultclass], + sut=map[Table.SUT][sent.sut], + swprofile=map[Table.SWPROFILE][sent.swprofile], + rolemap=map[Table.ROLEMAP][sent.rolemap], + role=map[Table.ROLE][sent.role], ) - ids[Table.RESULT][sent.id] = dent.id + map[Table.RESULT][sent] = dent # Merge ERROR tables. for sent in sents[Table.ERROR]: - dent = create( - schema.ERROR, - **sent.to_dict( - attrs=[ - "timestamp", - "session_uuid", - "error", - ] - ), - sut_id=ids[Table.SUT][sent.sut_id], - swprofile_id=ids[Table.SWPROFILE][sent.swprofile_id], - benchmark_id=ids[Table.BENCHMARK][sent.benchmark_id], + dent = create_from( + sent, + sut=map[Table.SUT][sent.sut], + swprofile=map[Table.SWPROFILE][sent.swprofile], + benchmark=map[Table.BENCHMARK][sent.benchmark], + rolemap=map[Table.ROLEMAP][sent.rolemap], + role=map[Table.ROLE][sent.role], ) - ids[Table.ERROR][sent.id] = dent.id + map[Table.ERROR][sent] = dent self.session.commit() diff --git a/fastpath/utils/schema.py b/fastpath/utils/schema.py index ef331a58476a60f8276481af4c459d665f88778b..5642719096df80e4dd9286ed2b39f8ade5c33d91 100644 --- a/fastpath/utils/schema.py +++ b/fastpath/utils/schema.py @@ -4,6 +4,7 @@ import enum import sqlalchemy as sa from sqlalchemy.orm import declarative_base, relationship +from fastpath.utils import fingerprint SZ_HASH = 64 @@ -16,12 +17,26 @@ BaseTable = declarative_base() class BaseMixin: - def to_dict(self, attrs): - return { - c.key: getattr(self, c.key) - for c in sa.inspection.inspect(self).mapper.column_attrs - if c.key in attrs - } + def to_dict(self, attrs=None, notattrs=None): + """ + Returns object as a dict. Returns the keys that are in attrs and not in + notattrs. If attrs is None, behaves as if attrs is the list of all keys. + If notattrs is None, behaves as if notattrs is an empty list. + """ + keys = [c.key for c in sa.inspection.inspect(self).mapper.column_attrs] + if attrs: + keys = [k for k in keys if k in attrs] + if notattrs: + keys = [k for k in keys if k not in notattrs] + return {k: getattr(self, k) for k in keys} + + def hash(self, attrs=None, notattrs=None): + """ + Returns a stable sha256 hash of the object, including/excluding any + specified fields. If attrs is None, behaves as if attrs is the list of + all keys. If notattrs is None, behaves as if notattrs is an empty list. + """ + return fingerprint.hash(self.to_dict(attrs, notattrs)) class BENCHMARK(BaseTable, BaseMixin): @@ -33,10 +48,12 @@ class BENCHMARK(BaseTable, BaseMixin): type = sa.Column(sa.String(SZ_NAME), nullable=False) image = sa.Column(sa.String(SZ_DESC), nullable=False) params_hash = sa.Column(sa.String(SZ_HASH), nullable=False) + roles_hash = sa.Column(sa.String(SZ_HASH), nullable=False) errors = relationship("ERROR", back_populates="benchmark") params = relationship("PARAM", back_populates="benchmark") resultclasses = relationship("RESULTCLASS", back_populates="benchmark") + roles = relationship("ROLE", back_populates="benchmark") class SWPROFILE(BaseTable, BaseMixin): @@ -63,9 +80,9 @@ class CPU(BaseTable, BaseMixin): id = sa.Column(sa.Integer, primary_key=True, autoincrement=True) desc = sa.Column(sa.String(SZ_DESC), nullable=False) cpu_index = sa.Column(sa.SmallInteger, nullable=False) - sut_id = sa.Column(sa.Integer, sa.ForeignKey("SUT.id"), nullable=False) + node_id = sa.Column(sa.Integer, sa.ForeignKey("NODE.id"), nullable=False) - sut = relationship("SUT", back_populates="cpus") + node = relationship("NODE", back_populates="cpus") class ERROR(BaseTable, BaseMixin): @@ -80,14 +97,42 @@ class ERROR(BaseTable, BaseMixin): benchmark_id = sa.Column( sa.Integer, sa.ForeignKey("BENCHMARK.id"), nullable=False ) + rolemap_id = sa.Column( + sa.Integer, sa.ForeignKey("ROLEMAP.id"), nullable=False + ) + role_id = sa.Column(sa.Integer, sa.ForeignKey("ROLE.id"), nullable=False) session_uuid = sa.Column(sa.Uuid, nullable=False) error = sa.Column(sa.Integer, nullable=False) benchmark = relationship("BENCHMARK", back_populates="errors") + role = relationship("ROLE", back_populates="errors") + rolemap = relationship("ROLEMAP", back_populates="errors") swprofile = relationship("SWPROFILE", back_populates="errors") sut = relationship("SUT", back_populates="errors") +class NODE(BaseTable, BaseMixin): + __tablename__ = "NODE" + + id = sa.Column(sa.Integer, primary_key=True, autoincrement=True) + sut_id = sa.Column(sa.Integer, sa.ForeignKey("SUT.id"), nullable=False) + name = sa.Column(sa.String(SZ_NAME), nullable=False) + host_name = sa.Column(sa.String(SZ_NAME), nullable=False) + architecture = sa.Column(sa.String(SZ_NAME), nullable=False) + cpu_count = sa.Column(sa.SmallInteger, nullable=False) + cpu_info_hash = sa.Column(sa.String(SZ_HASH), nullable=False) + numa_count = sa.Column(sa.SmallInteger, nullable=False) + ram_sz = sa.Column(sa.BigInteger, nullable=False) + hypervisor = sa.Column(sa.String(SZ_NAME), nullable=False) + product_name = sa.Column(sa.String(SZ_DESC), nullable=False) + product_serial = sa.Column(sa.String(SZ_DESC), nullable=False) + mac_addrs_hash = sa.Column(sa.String(SZ_HASH), nullable=False) + + cpus = relationship("CPU", back_populates="node") + rmdescs = relationship("RMDESC", back_populates="node") + sut = relationship("SUT", back_populates="nodes") + + class PARAM(BaseTable, BaseMixin): __tablename__ = "PARAM" @@ -113,10 +158,16 @@ class RESULT(BaseTable, BaseMixin): swprofile_id = sa.Column( sa.Integer, sa.ForeignKey("SWPROFILE.id"), nullable=False ) + rolemap_id = sa.Column( + sa.Integer, sa.ForeignKey("ROLEMAP.id"), nullable=False + ) + role_id = sa.Column(sa.Integer, sa.ForeignKey("ROLE.id"), nullable=False) session_uuid = sa.Column(sa.Uuid, nullable=False) value = sa.Column(sa.Double, nullable=False) resultclass = relationship("RESULTCLASS", back_populates="results") + role = relationship("ROLE", back_populates="results") + rolemap = relationship("ROLEMAP", back_populates="results") swprofile = relationship("SWPROFILE", back_populates="results") sut = relationship("SUT", back_populates="results") @@ -140,22 +191,54 @@ class RESULTCLASS(BaseTable, BaseMixin): results = relationship("RESULT", back_populates="resultclass") +class RMDESC(BaseTable, BaseMixin): + __tablename__ = "RMDESC" + + id = sa.Column(sa.Integer, primary_key=True, autoincrement=True) + rolemap_id = sa.Column( + sa.Integer, sa.ForeignKey("ROLEMAP.id"), nullable=False + ) + role_id = sa.Column(sa.Integer, sa.ForeignKey("ROLE.id"), nullable=False) + node_id = sa.Column(sa.Integer, sa.ForeignKey("NODE.id"), nullable=False) + + node = relationship("NODE", back_populates="rmdescs") + role = relationship("ROLE", back_populates="rmdescs") + rolemap = relationship("ROLEMAP", back_populates="rmdescs") + + +class ROLE(BaseTable, BaseMixin): + __tablename__ = "ROLE" + + id = sa.Column(sa.Integer, primary_key=True, autoincrement=True) + name = sa.Column(sa.String(SZ_NAME), nullable=False) + benchmark_id = sa.Column( + sa.Integer, sa.ForeignKey("BENCHMARK.id"), nullable=False + ) + + benchmark = relationship("BENCHMARK", back_populates="roles") + errors = relationship("ERROR", back_populates="role") + results = relationship("RESULT", back_populates="role") + rmdescs = relationship("RMDESC", back_populates="role") + + +class ROLEMAP(BaseTable, BaseMixin): + __tablename__ = "ROLEMAP" + + id = sa.Column(sa.Integer, primary_key=True, autoincrement=True) + rmdescs_hash = sa.Column(sa.String(SZ_HASH), nullable=False) + + errors = relationship("ERROR", back_populates="rolemap") + results = relationship("RESULT", back_populates="rolemap") + rmdescs = relationship("RMDESC", back_populates="rolemap") + + class SUT(BaseTable, BaseMixin): __tablename__ = "SUT" id = sa.Column(sa.Integer, primary_key=True, autoincrement=True) name = sa.Column(sa.String(SZ_NAME), nullable=False) - host_name = sa.Column(sa.String(SZ_NAME), nullable=False) - architecture = sa.Column(sa.String(SZ_NAME), nullable=False) - cpu_count = sa.Column(sa.SmallInteger, nullable=False) - cpu_info_hash = sa.Column(sa.String(SZ_HASH), nullable=False) - numa_count = sa.Column(sa.SmallInteger, nullable=False) - ram_sz = sa.Column(sa.BigInteger, nullable=False) - hypervisor = sa.Column(sa.String(SZ_NAME), nullable=False) - product_name = sa.Column(sa.String(SZ_DESC), nullable=False) - product_serial = sa.Column(sa.String(SZ_DESC), nullable=False) - mac_addrs_hash = sa.Column(sa.String(SZ_HASH), nullable=False) + nodes_hash = sa.Column(sa.String(SZ_HASH), nullable=False) - cpus = relationship("CPU", back_populates="sut") errors = relationship("ERROR", back_populates="sut") + nodes = relationship("NODE", back_populates="sut") results = relationship("RESULT", back_populates="sut") diff --git a/fastpath/utils/table.py b/fastpath/utils/table.py index 42681ed28af71f5c768eddb0be44d4446359c591..1adee3d52898960f9ae590dfff1d69e34ee78aaf 100644 --- a/fastpath/utils/table.py +++ b/fastpath/utils/table.py @@ -46,21 +46,17 @@ def load_tables(resultstore, merge_similar=True): Convenience function to convert the contents of a resultstore to a set of pandas dataframes, returned as a dictionary containing each dataframe, indexed by the Table enum. Additionally a "unique" column is added for SUT, - SWPROFILE and BENCHMARK tables, which remains as friendly as possible while - being guarranteed unique. When merge_similar=True, objects are merged if - they have only trival differences. + NODE, SWPROFILE, BENCHMARK and ROLEMAP tables, which remains as friendly as + possible while being guarranteed unique. When merge_similar=True, objects + are merged if they have only trival differences. """ - if rs.is_csv(resultstore): - store = rs.ResultSet.from_csv(resultstore) - else: - store = rs.ResultSet.open(resultstore) - dfs = store.to_dfs() + dfs = rs.open_or_import(resultstore).to_dfs() if merge_similar: # Merge swprofiles when all fields match except kernel_cmdline_full_hash - # and userspace_name. Often different SUTs have a slightly different + # and userspace_name. Often different NODEs have a slightly different # command line (e.g. rootfs) or have a slightly different userspace, so - # let's ignore those differences to allow comparing SUTs. + # let's ignore those differences to allow comparing NODEs. dfs = _dedup_and_fix_refs( dfs, Table.SWPROFILE, @@ -75,14 +71,17 @@ def load_tables(resultstore, merge_similar=True): dfs, Table.BENCHMARK, ["image"], "benchmark_id" ) - def add_unique_col(df, namecol): + def add_unique_col(df, namecol=None, prefix=None): def make_unique(row): name = row[namecol] id = row.name return name if counts[name] == 1 else f"{name}:{id}" - counts = df[namecol].value_counts() - df["unique"] = df.apply(make_unique, axis=1) + if namecol: + counts = df[namecol].value_counts() + df["unique"] = df.apply(make_unique, axis=1) + else: + df["unique"] = df.apply(lambda row: f"{prefix}{row.name}", axis=1) dfs[Table.BENCHMARK]["suite/name"] = dfs[Table.BENCHMARK].apply( lambda row: f"{row['suite']}/{row['name']}", @@ -92,6 +91,8 @@ def load_tables(resultstore, merge_similar=True): dfs[Table.BENCHMARK].drop(["suite/name"], axis=1, inplace=True) add_unique_col(dfs[Table.SWPROFILE], "name") add_unique_col(dfs[Table.SUT], "name") + add_unique_col(dfs[Table.NODE], "name") + add_unique_col(dfs[Table.ROLEMAP], prefix="rm") return dfs @@ -117,6 +118,9 @@ def join_results(tables): results = results.join( tables[Table.BENCHMARK]["unique"].rename("benchmark"), on="benchmark_id" ) + results = results.join( + tables[Table.ROLEMAP]["unique"].rename("rolemap"), on="rolemap_id" + ) return results diff --git a/scripts/convert_rs_schema.py b/scripts/convert_rs_schema.py new file mode 100755 index 0000000000000000000000000000000000000000..d8513ea2940944826fcebd8e7af5e6b042d5a7c2 --- /dev/null +++ b/scripts/convert_rs_schema.py @@ -0,0 +1,197 @@ +#!/usr/bin/env python3 +# Copyright (c) 2025, Arm Limited. +# SPDX-License-Identifier: MIT + + +# Script to convert resultstore data from schema v1 to v2. Expects input +# resultstore in CSV format, and outputs to new directory in CSV format. +# $ convert_rs_schema.py + + +import enum +import hashlib +import json +import os +import pandas as pd +import sys + + +class Table(enum.Enum): + BENCHMARK = (1,) + SWPROFILE = (2,) + CPU = (3,) + ERROR = (4,) + NODE = (5,) + PARAM = (6,) + RESULT = (7,) + RMDESC = (8,) + ROLE = (9,) + ROLEMAP = (10,) + RESULTCLASS = (11,) + SUT = (12,) + + +old_tables = [ + Table.BENCHMARK, + Table.SWPROFILE, + Table.CPU, + Table.ERROR, + Table.PARAM, + Table.RESULT, + Table.RESULTCLASS, + Table.SUT +] + + +def hash(obj): + # Ensure a total ordering for all (nested) keys. + obj = json.dumps(obj, sort_keys=True) + m = hashlib.sha256() + m.update(obj.encode()) + return m.hexdigest() + + +def row_by_index(df, index): + return df.loc[index].map(lambda x: x.item() if hasattr(x, "item") else x) + + +def _hash(row, notcols): + cols = set(row.keys()) - set(notcols) + d = {col: "" if pd.isna(row[col]) else row[col] for col in cols} + return hash(d) + + +def hash_node(row): + return _hash(row, ["id", "sut_id"]) + + +def hash_role(row): + return _hash(row, ["id", "benchmark_id"]) + + +def hash_benchmark(row): + return _hash(row, ["id"]) + + +# Get arguments. +if len(sys.argv) != 3: + print(f"Usage: convert_rs_schema.py ") + exit(1) +old_path = sys.argv[1] +new_path = sys.argv[2] + +# Read in resultstore in old format. +dfs = {t: pd.read_csv(os.path.join(old_path, f"{t.name}.csv")) for t in old_tables} + +# Create NODE table based on SUT table. Since old schema only supports single +# node per sut, there is 1:1 mapping between node and sut and SUT.id == NODE.id. +node = dfs[Table.SUT].copy() +cols = node.columns.insert(1, "sut_id") +node["sut_id"] = node["id"] +node = node[cols] +dfs[Table.NODE] = node + +# Convert SUT table; remove most columns and add nodes_hash. +sut = dfs[Table.SUT] +sut["nodes_hash"] = sut.apply(lambda r: hash([hash_node(r)]), axis=1) +sut = sut[["id", "name", "nodes_hash"]] +dfs[Table.SUT] = sut + +# Convert CPU table; sut_id becomes node_id. +cpu = dfs[Table.CPU] +cpu = cpu.rename({"sut_id": "node_id"}, axis=1) +dfs[Table.CPU] = cpu + +# Add roles_hash to BENCHMARK table. Since existing benchmarks don't explicitly +# define any roles, they are assumed to have a single "executer" role. +benchmark = dfs[Table.BENCHMARK] +benchmark["roles_hash"] = hash(["executer"]) +dfs[Table.BENCHMARK] = benchmark + +# Create a ROLE table. Every benchmark has a single role called "executer" so +# set the role ids to match the benchmark ids for ease. +role = dfs[Table.BENCHMARK][["id"]].copy() +role["name"] = "executer" +role["benchmark_id"] = role["id"] +dfs[Table.ROLE] = role + +# There is a rolemap with a single rmdesc for every (sut_id, benchmark_id) tuple +# that appears in a result or error. So let's calculate a rolemap_lut, which +# maps from (sut_id, benchmark_id) to rolemap_id (which is also the same id used +# for the single rmdesc). We also need a lut to map from resultclass_id to +# benchmark_id for the RESULT table. +result = dfs[Table.RESULT].set_index("id") +resultclass = dfs[Table.RESULTCLASS].set_index("id") +result = result.join(resultclass[["benchmark_id"]], on="resultclass_id") +result = result.reset_index() + +records = result[["resultclass_id", "benchmark_id"]].drop_duplicates() +records = records.to_dict("records") +benchmark_lut = {} +for r in records: + benchmark_lut[r["resultclass_id"]] = r["benchmark_id"] + +rrecs = result[["sut_id", "benchmark_id"]] +erecs = dfs[Table.ERROR][["sut_id", "benchmark_id"]] +records = pd.concat([rrecs, erecs]).drop_duplicates().to_dict("records") + +rolemap_lut = {} +for i, r in enumerate(records, 1): + rolemap_lut[(r["sut_id"], r["benchmark_id"])] = i + +# Generate the ROLEMAP and RMDESC tables. +rolemaps = [] +rmdescs = [] +node = node.set_index("id") +role = role.set_index("id") +benchmark = benchmark.set_index("id") +for (sut_id, benchmark_id), rolemap_id in rolemap_lut.items(): + # rolemap_id and rmdesc_id are the same in this case. + # node_id and sut_id are the same in this case. + # role_id and benchmark_id are the same in this case. + rmdesc_id = rolemap_id + node_id = sut_id + role_id = benchmark_id + + node_hash = hash_node(row_by_index(node, node_id)) + role_hash = hash_role(row_by_index(role, role_id)) + benchmark_hash = hash_benchmark(row_by_index(benchmark, benchmark_id)) + + rmdesc_hash = hash([node_hash, role_hash, benchmark_hash]) + rolemaps.append({ + "id": rolemap_id, + "rmdescs_hash": hash([rmdesc_hash]), + }) + rmdescs.append({ + "id": rmdesc_id, + "rolemap_id": rolemap_id, + "role_id": role_id, + "node_id": node_id, + }) +rolemap = pd.DataFrame(rolemaps).sort_values(by="id").reset_index(drop=True) +rmdesc = pd.DataFrame(rmdescs).sort_values(by="id").reset_index(drop=True) +dfs[Table.ROLEMAP] = rolemap +dfs[Table.RMDESC] = rmdesc + +# Now add the rolemap_id and role_id columns to the RESULT table. +result = dfs[Table.RESULT].copy() +cols = result.columns.insert(5, "rolemap_id") +cols = cols.insert(6, "role_id") +result["rolemap_id"] = result.apply(lambda r: rolemap_lut[(r["sut_id"], benchmark_lut[r["resultclass_id"]])], axis=1) +result["role_id"] = result.apply(lambda r: benchmark_lut[r["resultclass_id"]], axis=1) +result = result[cols] +dfs[Table.RESULT] = result + +# Now add the rolemap_id and role_id columns to the ERROR table. +error = dfs[Table.ERROR].copy() +cols = error.columns.insert(5, "rolemap_id") +cols = cols.insert(6, "role_id") +error["rolemap_id"] = error.apply(lambda r: rolemap_lut[(r["sut_id"], r["benchmark_id"])], axis=1) +error["role_id"] = error["benchmark_id"] +error = error[cols] +dfs[Table.ERROR] = error + +# Finally save out the tables in the new schema. +os.makedirs(new_path, exist_ok=True) +for t in Table: + dfs[t].to_csv(os.path.join(new_path, f"{t.name}.csv"), index=False)