From 83041b9f3e45c4c0320b85c2c78afe036ef55f15 Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Fri, 12 Jul 2024 15:17:13 +0100 Subject: [PATCH 1/2] setup.py: Bump required polars version --- setup.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 9ae1b5322..6f9ccd0cc 100755 --- a/setup.py +++ b/setup.py @@ -141,9 +141,11 @@ if __name__ == "__main__": # Avoid: # polars 1.7.0, 1.7.1: https://github.com/pola-rs/polars/issues/18719 # Require: + # polars >= 1.2.0 to avoid: https://github.com/pola-rs/polars/issues/17591 # polars >= 1.15.0: https://github.com/pola-rs/polars/issues/19994 # polars >= 1.16.0: https://github.com/pola-rs/polars/issues/20000 - "polars >= 1.16.0, < 2.0.0", + # polars >= 1.16.0: https://github.com/pola-rs/polars/issues/22385 + "polars >= 1.29.0, < 2.0.0", # Pandas >= 1.0.0 has support for new nullable dtypes # Pandas 1.2.0 has broken barplots: # https://github.com/pandas-dev/pandas/issues/38947 -- GitLab From b4202219ac981a40ac886d34dd02c923a6b1b5a5 Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Fri, 12 Apr 2024 18:14:20 +0100 Subject: [PATCH 2/2] tools/lisa-trace-server: Add lisa-trace-server --- .dockerignore | 1 + devmode_requirements.txt | 1 + doc/man1/lisa.1 | 7 + install_base.sh | 7 + lisa/datautils.py | 76 ++ lisa/trace.py | 274 +++++- requirements.txt | 1 + setup.py | 3 + shell/lisa_shell | 3 + tools/lisa-trace-server/LICENSE.txt | 202 +++++ tools/lisa-trace-server/README.rst | 7 + tools/lisa-trace-server/docker/Dockerfile | 25 + tools/lisa-trace-server/docker/entrypoint.sh | 21 + tools/lisa-trace-server/pyproject.toml | 67 ++ tools/lisa-trace-server/requirements.txt | 12 + .../src/lisatraceserver/__init__.py | 0 .../src/lisatraceserver/_gunicorn.py | 35 + .../src/lisatraceserver/io.py | 336 +++++++ .../src/lisatraceserver/main.py | 821 ++++++++++++++++++ .../src/lisatraceserver/trace.py | 451 ++++++++++ .../src/lisatraceserver/utils.py | 71 ++ 21 files changed, 2412 insertions(+), 9 deletions(-) create mode 100644 .dockerignore create mode 100644 tools/lisa-trace-server/LICENSE.txt create mode 100644 tools/lisa-trace-server/README.rst create mode 100644 tools/lisa-trace-server/docker/Dockerfile create mode 100755 tools/lisa-trace-server/docker/entrypoint.sh create mode 100644 tools/lisa-trace-server/pyproject.toml create mode 100644 tools/lisa-trace-server/requirements.txt create mode 100644 tools/lisa-trace-server/src/lisatraceserver/__init__.py create mode 100644 tools/lisa-trace-server/src/lisatraceserver/_gunicorn.py create mode 100644 tools/lisa-trace-server/src/lisatraceserver/io.py create mode 100644 tools/lisa-trace-server/src/lisatraceserver/main.py create mode 100644 tools/lisa-trace-server/src/lisatraceserver/trace.py create mode 100644 tools/lisa-trace-server/src/lisatraceserver/utils.py diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 000000000..f3b64113e --- /dev/null +++ b/.dockerignore @@ -0,0 +1 @@ +**/.git diff --git a/devmode_requirements.txt b/devmode_requirements.txt index c97069d6a..04c8ac9c3 100644 --- a/devmode_requirements.txt +++ b/devmode_requirements.txt @@ -13,3 +13,4 @@ -e ./tools/bisector -e ./tools/lisa-combine-gitlab-mr +-e ./tools/lisa-trace-server diff --git a/doc/man1/lisa.1 b/doc/man1/lisa.1 index 47db233a3..89ec18e79 100644 --- a/doc/man1/lisa.1 +++ b/doc/man1/lisa.1 @@ -244,6 +244,13 @@ results T} _ T{ +LISA_TRACE_SERVER_PROXY +T} T{ +HTTP URL to a trace server proxy +T} T{ +T} +_ +T{ LISA_UPGRADE_VENV T} T{ 1 to make lisa\-install upgrade the venv specified in LISA_VENV_PATH. diff --git a/install_base.sh b/install_base.sh index fb1ba0346..87dd8055e 100755 --- a/install_base.sh +++ b/install_base.sh @@ -305,6 +305,13 @@ for arg in "${args[@]}"; do handled=1 ;;& + "--install-trace-server" | "--install-all") + # For now there is nothing to install in particular for the trace + # server, but we still want to install all the default things like + # a full python3 environment. + handled=1 + ;;& + "--help") usage exit 0 diff --git a/lisa/datautils.py b/lisa/datautils.py index 9dd6d163a..aaa0ccb95 100644 --- a/lisa/datautils.py +++ b/lisa/datautils.py @@ -31,6 +31,9 @@ import decimal from numbers import Number import weakref import threading +from tempfile import NamedTemporaryFile +import os +from pathlib import Path import polars as pl import polars.selectors as cs @@ -40,6 +43,8 @@ import pandas.api.extensions import scipy.integrate import scipy.signal import pyarrow +import pyarrow.dataset +import pyarrow.parquet from lisa.utils import TASK_COMM_MAX_LEN, groupby, deprecate, order_as @@ -2747,4 +2752,75 @@ SignalDesc._SIGNALS_MAP = { for event, signal_descs in groupby(_SIGNALS, key=attrgetter('event')) } + +class _SameAsInput: + pass +_SAME_AS_INPUT = _SameAsInput() + + +def _reencode_parquet(in_path, out_path=None, row_group_size=_SAME_AS_INPUT, compression=_SAME_AS_INPUT, compression_level=_SAME_AS_INPUT, statistics=_SAME_AS_INPUT, page_index=_SAME_AS_INPUT): + """ + Re-encode a parquet file, allowing to modify some aspects of the file while + preserving the others. + """ + in_path = Path(in_path).resolve() + out_path = in_path if out_path is None else Path(out_path).resolve() + + @functools.lru_cache + def get_meta(): + return pyarrow.parquet.read_metadata(str(in_path)) + + if row_group_size is _SAME_AS_INPUT: + meta = get_meta() + row_group_size = math.ceil(meta.num_rows / meta.num_row_groups) + + if compression is _SAME_AS_INPUT: + meta = get_meta() + compression = meta.row_group(0).column(0).compression + + if compression_level is _SAME_AS_INPUT: + # TODO: there does not seem to be a way to query that currently + compression_level = None + + if statistics is _SAME_AS_INPUT: + meta = get_meta() + statistics = meta.row_group(0).column(0).is_stats_set + + if page_index is _SAME_AS_INPUT: + meta = get_meta() + # TODO: pyarrow does not currently read page index, so we have no way + # of querying if it has it enabled or not. + page_index = False + + # pyarrow uses lz4 raw when asked for lz4 + if compression.upper() == 'LZ4_RAW': + compression = 'LZ4' + + + if in_path == out_path: + @contextlib.contextmanager + def out_path_cm(): + with NamedTemporaryFile(dir=out_path.parent, prefix=f'.{out_path.name}.temp.') as f: + yield Path(f.name).resolve() + else: + out_path_cm = lambda: contextlib.nullcontext(out_path) + + reader = pyarrow.parquet.ParquetFile(str(in_path)) + with out_path_cm() as temp_out_path: + writer = pyarrow.parquet.ParquetWriter( + str(temp_out_path), + schema=reader.schema.to_arrow_schema(), + compression=compression, + compression_level=compression_level, + write_statistics=statistics, + write_page_index=page_index, + ) + + with reader as reader, writer as writer: + for batch in reader.iter_batches(batch_size=row_group_size): + writer.write_batch(batch, row_group_size=row_group_size) + + temp_out_path.rename(out_path) + + # vim :set tabstop=4 shiftwidth=4 textwidth=80 expandtab diff --git a/lisa/trace.py b/lisa/trace.py index 4ce6211df..f0b0e7f7f 100644 --- a/lisa/trace.py +++ b/lisa/trace.py @@ -53,6 +53,9 @@ import warnings from concurrent.futures import ThreadPoolExecutor from urllib.parse import urlparse import types +import ipaddress +import logging +import textwrap import numpy as np import pandas as pd @@ -61,6 +64,7 @@ import pyarrow.parquet import polars as pl import polars.exceptions import polars.selectors as cs +import requests import devlib @@ -86,6 +90,60 @@ def __getattr__(name): raise AttributeError(f"module {__name__!r} has no attribute {name!r}") +def _resolve_url(url): + scheme = urlparse(url).scheme + if scheme == 'http': + session = requests.Session() + while True: + r = session.head( + url, + # Automatic redirection management drops username/password from + # URL so we do it manually + allow_redirects=False, + ) + r.raise_for_status() + + redirect = r.headers.get('location', url) + if url == redirect: + break + else: + url = redirect + + status = r.status_code + + # If the server provides us a local file path, we use that directly + try: + url = r.headers['x-lisa-local-path'] + except KeyError: + pass + else: + # Check the path in question is readable by us + path = Path(urlparse(url).path) + try: + with path.open('rb'): + pass + except Exception as e: + logging.getLogger('URL').error(f'Could not open path sent by localhost server: {path}: {e}') + url = r.url + + is_temporary = ( + ('temporary' in r.headers.get('x-lisa-location-info', '')) or + # If the final url was obtained after following a temporary + # redirect + any( + _r.status_code == 307 + for _r in r.history + ) + ) + return (url, is_temporary) + else: + # We allow caching of file:// URLs if they were passed as-is, so the + # path is under the control of the user. This is different from a + # server serving us a Temporary Redirection to a file:// URL, that is + # under the control of the server. + return (url, False) + + _DEALLOCATORS = weakref.WeakSet() _DEALLOCATORS_LOCK = threading.RLock() @@ -253,6 +311,7 @@ def _logical_plan_resolve_paths(cache, plan, kind): else: return path else: + path, _ = _resolve_url(path) return path else: raise ValueError(f'Unknown kind {kind}') @@ -700,6 +759,195 @@ class TraceParserBase(abc.ABC, Loggable, PartialInit): return +class ClientTraceParser(TraceParserBase): + """ + Client of a trace server. + """ + + # We don't want the infrastructure to attempt to steal our files, fail at + # doing so and collect() the LazyFrame as a result for safety + _STEAL_FILES = False + + @kwargs_forwarded_to(TraceParserBase.__init__) + def __init__(self, api, trace_id, trace_infos, server_is_local=False, **kwargs): + logger = self.logger + + self._api = api + self._trace_id = trace_id + self._trace_infos = trace_infos + self._server_is_local = server_is_local + + try: + tags = trace_infos['tags'] + except KeyError: + logger.debug(f'Trace ID {trace_id} does not have any tags') + else: + _tags = '\n'.join( + textwrap.indent(f'{name}: {value}', ' ' * 4) + for name, value in sorted(tags.items()) + ) + logger.info(f'Trace ID {trace_id} tags:\n{_tags}') + super().__init__(**kwargs) + + @classmethod + def _fetch_infos(self, api, trace_id, metadata): + url = f'{api}/v1/traces/by/id/{trace_id}/infos' + r = requests.get( + url, + params={ + 'metadata': ','.join(sorted( + key + for key in metadata + )) + } + ) + r.raise_for_status() + infos = r.json() + + def sort(obj): + if isinstance(obj, Mapping): + return { + key: sort(value) + for key, value in sorted(obj.items()) + } + elif isinstance(obj, Sequence) and not isinstance(obj, str): + return list(map(sort, obj)) + else: + return obj + + # Sort so that TraceParserBase.get_parser_id() provides a stable result + infos = sort(infos) + + return infos + + @classmethod + def from_trace_id(cls, api, trace_id): + # Some metadata are quite expensive to fetch as they can be very large. + # We avoid fetching those initially, especially if they are rarely + # used. + expensive = { + 'symbols-address' + } + + proxy = os.environ.get('LISA_TRACE_SERVER_PROXY') + if proxy: + cls.get_logger().info(f'Using trace server proxy: {proxy}') + api = proxy + + infos = cls._fetch_infos( + api=api, + trace_id=trace_id, + metadata={ + key + for key in TraceParserBase.METADATA_KEYS + if key not in expensive + } + ) + with requests.head(f'{api}/v1/ping', stream=True) as response: + (server_ip, server_port) = response.raw.connection.sock.getpeername() + server_ip = ipaddress.ip_address(server_ip) + server_is_local = server_ip.is_loopback + + # Since we gathered the infos at this point, any parser made out of + # this PartialInit object will get the infos "for free" so we don't + # have to re-fetch them every time we spin up a parser. + return cls( + api=api, + trace_id=trace_id, + trace_infos=infos, + server_is_local=server_is_local, + ) + + @classmethod + def from_trace_url(cls, url): + try: + url = urlparse(url) + scheme = url.scheme + if scheme == 'lisatrace': + url = url._replace(scheme='http') + path = Path(url.path) + trace_id = path.relative_to(Path('/v1/traces/by/id')) + trace_id, = trace_id.parts + else: + raise ValueError + except ValueError: + raise ValueError(f'Could not extract trace ID from URL: {url}') + else: + return ClientTraceParser.from_trace_id( + api=f'http://{url.netloc}', + trace_id=trace_id, + ) + + def get_metadata(self, key): + default = functools.partial(super().get_metadata, key) + + def from_infos(infos): + return infos['metadata'][key] + + if key in TraceParserBase.METADATA_KEYS: + try: + v = from_infos(self._trace_infos) + except KeyError: + if key in self._requested_metadata: + infos = self._fetch_infos( + api=self._api, + trace_id=self._trace_id, + metadata=[key], + ) + v = from_infos(infos) + else: + return default() + + return _Trace._meta_from_json({key: v})[key] + else: + return default() + + @memoized + def _parquet_location(self, event): + events_infos = self._trace_infos['events'] + try: + infos = events_infos[event] + except KeyError: + raise MissingTraceEventError([event]) + else: + api = self._api + file_id = infos['file_id'] + url = f'{api}/v1/files/by/id/{file_id}/content' + + url = _resolve_url(url) + return url + + def parse_event(self, event): + url, is_temporary_loc = self._parquet_location(event) + # We don't cache the LazyFrame in memory if the location of the data + # itself is temporary. This means the server plans on shortly providing + # a more permanent location, possibly with better performances, but + # that new location was not ready yet at the time we made the request. + # This way, we will issue a new request next time the parser is + # queried, and we will have a chance to use that better location in the + # close future. + mem_cacheable = (not is_temporary_loc) + + # If the LazyFrame was swapped out, it would "hardcode" the file URL + # into that cached file. Instead, if we require the _Trace to query the + # parser every time, it will allow returning a fresh URL with the + # current set of redirections resolved so we don't end up with stale + # URL that does not work anymore. + # However, we swap LazyFrames based on HTTP URLs. If the URL does not + # work anymore, the swap entry will be invalidated upon reload and we + # will invoke the parser again. Otherwise, this reduces the server + # load. + swap_cacheable = (urlparse(url).scheme != 'file') + + df = pl.scan_parquet(url) + df = _ParsedDataFrame.from_df( + df=df, + swap_cacheable=swap_cacheable, + mem_cacheable=mem_cacheable, + ) + return df + + class MockTraceParser(TraceParserBase): """ Mock parser that just returns the dataframes it was given. @@ -4678,6 +4926,10 @@ class _TraceCache(Loggable): plan = io.StringIO(plan) data = _df_json_deserialize(plan) data = _LazyFrameOnDelete.attach_file_cleanup(data, hardlinks) + + # Check the backing data can actually be read, e.g. in case it + # is based on a stale HTTP URL. + data.clear().collect() else: raise ValueError(f'File format not supported "{fmt}" at path: {path}') @@ -4772,8 +5024,6 @@ class _TraceCache(Loggable): except ValueError: pass else: - # TODO: Load the file information from __init__ by discovering the - # swap area's content to avoid doing it each time here if self._swap_size > self.max_swap_size: self._scrub_swap(swap_dir) @@ -5259,13 +5509,19 @@ class _Trace(Loggable, _InternalTraceBase): if not trace_path: raise ValueError('A trace path must be provided') - _, extension = os.path.splitext(trace_path) - if extension == '.html': - parser = SysTraceParser.from_html - elif extension == '.txt': - parser = HRTxtTraceParser.from_txt_file - else: - parser = TraceDumpTraceParser.from_dat + url = urlparse(trace_path) + scheme = url.scheme + if scheme == 'lisatrace': + parser = ClientTraceParser.from_trace_url(trace_path) + elif scheme in ('file', ''): + _, extension = os.path.splitext(url.path) + + if extension == '.html': + parser = SysTraceParser.from_html + elif extension == '.txt': + parser = HRTxtTraceParser.from_txt_file + else: + parser = TraceDumpTraceParser.from_dat self._parser = parser # The platform information used to run the experiments diff --git a/requirements.txt b/requirements.txt index c1361b798..cbb41cf72 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ ./[all] ./tools/bisector ./tools/lisa-combine-gitlab-mr +./tools/lisa-trace-server diff --git a/setup.py b/setup.py index 6f9ccd0cc..ca816cf49 100755 --- a/setup.py +++ b/setup.py @@ -174,6 +174,9 @@ if __name__ == "__main__": "cffi", # unshare syscall "typeguard", + + # For trace server client + "requests", ], extras_require=extras_require, diff --git a/shell/lisa_shell b/shell/lisa_shell index a64824504..42ebaf3e7 100755 --- a/shell/lisa_shell +++ b/shell/lisa_shell @@ -73,6 +73,9 @@ export LISA_RESULT_ROOT=${LISA_RESULT_ROOT:-$LISA_HOME/results} export _DOC_EXEKALL_ARTIFACT_ROOT="Default root for exekall's artifacts" export EXEKALL_ARTIFACT_ROOT=${EXEKALL_ARTIFACT_ROOT:-$LISA_RESULT_ROOT} +export LISA_TRACE_SERVER_PROXY='' +export _DOC_LISA_TRACE_SERVER_PROXY="HTTP URL to a trace server proxy" + # Add our man pages export MANPATH="$MANPATH:$LISA_HOME/doc/" diff --git a/tools/lisa-trace-server/LICENSE.txt b/tools/lisa-trace-server/LICENSE.txt new file mode 100644 index 000000000..d64569567 --- /dev/null +++ b/tools/lisa-trace-server/LICENSE.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/tools/lisa-trace-server/README.rst b/tools/lisa-trace-server/README.rst new file mode 100644 index 000000000..b3b8252e0 --- /dev/null +++ b/tools/lisa-trace-server/README.rst @@ -0,0 +1,7 @@ +LISA trace server. Check out the project's `GitLab`__ for some +guides to installation and setup. + +``lisa-trace-server`` runs a server able to provide remote access to trace +data. + +__ https://gitlab.arm.com/tooling/lisa diff --git a/tools/lisa-trace-server/docker/Dockerfile b/tools/lisa-trace-server/docker/Dockerfile new file mode 100644 index 000000000..823a5d88c --- /dev/null +++ b/tools/lisa-trace-server/docker/Dockerfile @@ -0,0 +1,25 @@ +# syntax=docker/dockerfile:1.4 + +# To build the image: +# DOCKER_BUILDKIT=1 docker build -t lisa-trace-server --build-context lisa_home=/path/to/git/repo + +FROM ubuntu:24.04 +WORKDIR /workspace/lisa + +# This could be set to an alternative Python command, e.g. installed using the +# deadsnakes Ubuntu PPA +ARG LISA_PYTHON="python3" +ENV SCRATCH_DIR="/workspace/scratch" + +# Get the repository content +COPY --from=lisa_home . . + +# Install the distro packages +RUN ./install_base.sh --install-trace-server +RUN "$LISA_PYTHON" -m venv venv +# Install LISA and the trace server +RUN venv/bin/pip install -r tools/lisa-trace-server/requirements.txt + +RUN mkdir -p "$SCRATCH_DIR" +ENTRYPOINT ["/workspace/lisa/tools/lisa-trace-server/docker/entrypoint.sh"] +CMD ["--help"] diff --git a/tools/lisa-trace-server/docker/entrypoint.sh b/tools/lisa-trace-server/docker/entrypoint.sh new file mode 100755 index 000000000..2261e06fb --- /dev/null +++ b/tools/lisa-trace-server/docker/entrypoint.sh @@ -0,0 +1,21 @@ +#!/bin/bash +# +# SPDX-License-Identifier: Apache-2.0 +# +# Copyright (C) 2025, ARM Limited and contributors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +exec /workspace/lisa/venv/bin/lisa-trace-server run --scratch "$SCRATCH_DIR" "$@" + diff --git a/tools/lisa-trace-server/pyproject.toml b/tools/lisa-trace-server/pyproject.toml new file mode 100644 index 000000000..631a3e421 --- /dev/null +++ b/tools/lisa-trace-server/pyproject.toml @@ -0,0 +1,67 @@ +[project] +name = "lisa-trace-server" # Required +version = "1.0.0" # Required +description = "LISA trace server" +requires-python = ">=3.9" +license = {file = "LICENSE.txt"} +keywords = ["development"] + +readme = "README.rst" +authors = [ + {name = "Douglas RAILLARD"} +] +maintainers = [ + {name = "Arm Ltd." } +] + +# Classifiers help users find your project by categorizing it. +# +# For a list of valid classifiers, see https://pypi.org/classifiers/ +classifiers = [ # Optional + # How mature is this project? Common values are + # 3 - Alpha + # 4 - Beta + # 5 - Production/Stable + "Development Status :: 4 - Beta", + + "Programming Language :: Python :: 3 :: Only", + # This is not a standard classifier, as there is nothing defined for + # Apache 2.0 yet: + # https://pypi.org/classifiers/ + # It has not been tested under any other OS + "Operating System :: POSIX :: Linux", + + "Intended Audience :: Developers", +] + +dependencies = [ + "lisa-linux", + "fastapi", + "uvicorn[standard]", + "gunicorn", + "python-multipart", + # For HTTP range requests support + "baize", + "httpx", + "hishel", + "aiofiles", + "aiosqlite", + "sqlalchemy[asyncio]", + "fsspec", +] + +[project.urls] +"Homepage" = "https://gitlab.arm.com/tooling/lisa" +"Source" = "https://gitlab.arm.com/tooling/lisa" +"Documentation" = "https://tooling.sites.arm.com/lisa/" +"Bug Tracker" = "https://gitlab.arm.com/tooling/lisa/-/issues" + +# The following would provide a command line executable called `sample` +# which executes the function `main` from this package when invoked. +[project.scripts] # Optional +lisa-trace-server = "lisatraceserver.main:main" + +[build-system] +requires = ["pdm-backend"] +build-backend = "pdm.backend" + diff --git a/tools/lisa-trace-server/requirements.txt b/tools/lisa-trace-server/requirements.txt new file mode 100644 index 000000000..e99199c6b --- /dev/null +++ b/tools/lisa-trace-server/requirements.txt @@ -0,0 +1,12 @@ +# Must be ordered to satisfy the dependencies without pulling from PyPI. +# Once they are found by pip in editable mode as specified here, they will be +# used and not looked up on PyPI. + +# devlib before WA and LISA +./external/devlib/ + +# Install LISA +./[all] + +# Install the trace server +./tools/lisa-trace-server/ diff --git a/tools/lisa-trace-server/src/lisatraceserver/__init__.py b/tools/lisa-trace-server/src/lisatraceserver/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tools/lisa-trace-server/src/lisatraceserver/_gunicorn.py b/tools/lisa-trace-server/src/lisatraceserver/_gunicorn.py new file mode 100644 index 000000000..22acdb65a --- /dev/null +++ b/tools/lisa-trace-server/src/lisatraceserver/_gunicorn.py @@ -0,0 +1,35 @@ +#! /usr/bin/env python3 +# SPDX-License-Identifier: Apache-2.0 +# +# Copyright (C) 2024, ARM Limited and contributors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Create an app ready to be used for gunicorn ASGI backend. +""" + +import os +import pickle +import base64 + +from lisatraceserver.main import make_app + + +def _make_app(): + args = os.environ['__LISA_TRACE_SERVER_PICKLED_APP_ARGS'] + args = base64.b85decode(args) + args = pickle.loads(args) + return make_app(args) + +app = _make_app() diff --git a/tools/lisa-trace-server/src/lisatraceserver/io.py b/tools/lisa-trace-server/src/lisatraceserver/io.py new file mode 100644 index 000000000..e1aaac733 --- /dev/null +++ b/tools/lisa-trace-server/src/lisatraceserver/io.py @@ -0,0 +1,336 @@ +#! /usr/bin/env python3 +# SPDX-License-Identifier: Apache-2.0 +# +# Copyright (C) 2024, ARM Limited and contributors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +from pathlib import Path +import asyncio +import hashlib +import shutil +import os +import urllib.parse +from urllib.parse import urljoin +import tempfile +import functools + +from fastapi import Response, HTTPException +from baize.asgi.responses import FileResponse as BaizeFileResponse +import aiofiles +import aiofiles.os +import httpx + +from devlib.utils.misc import tls_property + +from lisatraceserver.utils import async_lru_cache, api_blob_url, blob_path_compos + +# Use baize FileResponse as it supports range requests that are necessary for +# efficient parquet support +class FastApiBaizeFileResponse(Response): + def __init__(self, **kwargs) -> None: + self._baize_response = BaizeFileResponse(**kwargs) + super().__init__() + + def __call__(self, *args, **kwargs): + return self._baize_response(*args, **kwargs) + + def __getattr__(self, attr): + return getattr(self._baize_response, attr) + + + +async def download_file(http_client, url, path): + path = Path(path) + request = http_client.build_request('GET', url) + while True: + url = request.url + + if url.scheme == 'file': + src_path = Path(url.path) + assert src_path.is_absolute() + try: + path.unlink() + except FileNotFoundError: + pass + path.symlink_to(src_path) + return + else: + response = http_client.stream( + 'GET', + url, + follow_redirects=False, + ) + async with response as response: + request = response.next_request + if request is None: + if response.status_code == 200: + async with aiofiles.open(str(path), 'wb') as f: + async for data in response.aiter_bytes(): + await f.write(data) + return + else: + raise HTTPException( + status_code=response.status, + details='Could not fetch file: {url}' + ) + else: + continue + + +class BlobNotFoundError(Exception): + def __init__(self, blob_id): + self.blob_id = blob_id + + def __str__(self): + return f'Could not find blob: {self.blob_id}' + + +class BlobStorage(abc.ABC): + + # async def upload_blob(self, blob_id, blob_content): + # pass + + @abc.abstractmethod + async def move_file_to_blob(self, path, blob_id): + pass + + @abc.abstractmethod + async def delete_blob(self, blob_id): + pass + + @abc.abstractmethod + async def get_blob_urls(self, blob_id): + pass + + async def move_file(self, path): + path = Path(path) + def compute_id(path): + with open(path, 'rb') as f: + digest = hashlib.file_digest(f, "sha256") + return digest.hexdigest() + blob_id = await asyncio.to_thread(compute_id, path) + await self.move_file_to_blob(path, blob_id=blob_id) + return blob_id + + +class LocalBlobStorage(BlobStorage): + def __init__(self, path): + super().__init__() + self.path = Path(path) + + def _blob_path(self, blob_id): + return Path(self.path, *blob_path_compos(blob_id)) + + async def move_file_to_blob(self, path, blob_id): + path = Path(path) + dst_path = self._blob_path(blob_id) + + try: + # We can just steal the file according to the API contract, which is + # much cheaper than a full copy. + with tempfile.NamedTemporaryFile(dir=self.path, delete=False) as f: + temp_dst = Path(f.name) + # Cross a possible filesystem boundary to a private temp location + try: + await asyncio.to_thread( + shutil.move, path, temp_dst + ) + except BaseException: + await aiofiles.os.unlink(temp_dst) + raise + else: + # Make the temp file visible to the rest of the world. + def f(): + dst_path.parent.mkdir(parents=True, exist_ok=True) + temp_dst.rename(dst_path) + + await asyncio.to_thread(f) + finally: + try: + await aiofiles.os.unlink(path) + except FileNotFoundError: + pass + + async def delete_blob(self, blob_id): + dst_path = self._blob_path(blob_id) + try: + await aiofiles.os.unlink(dst_path) + except FileNotFoundError: + pass + + async def get_blob_urls(self, blob_id): + path = self._blob_path(blob_id) + if path.exists(): + path = path.resolve() + url = urllib.parse.urlparse(str(path)) + url = url._replace(scheme='file').geturl() + return [url] + else: + raise BlobNotFoundError(blob_id) + + +class FsspecBlobStorage(BlobStorage): + def __init__(self, make_fs, root, url_expiration=3600 * 24 * 7): + self._make_fs = make_fs + self.root = Path(root) + self._url_expiration = url_expiration + + @tls_property + def _fs(self): + return self._make_fs() + _fs = _fs.basic_property + + def _abspath(self, path): + return Path(self.root, path).resolve() + + async def move_file_to_blob(self, path, blob_id): + dst = self._abspath(blob_id) + src = path.resolve() + await asyncio.to_thread( + lambda: self._fs.upload(str(src), str(dst)) + ) + await aiofiles.os.unlink(src) + + async def delete_blob(self, blob_id): + dst = self._abspath(blob_id) + await asyncio.to_thread( + lambda: self._fs.rm_file(str(dst)) + ) + + async def get_blob_urls(self, blob_id): + path = self._abspath(blob_id) + return await asyncio.to_thread( + lambda: self._fs.sign(path, self._url_expiration) + ) + + +class ProxyBlobStorage(BlobStorage): + def __init__(self, api_base, storage): + self._api_base = api_base + self._storage = storage + + async def _api_blob_url(self, blob_id): + return api_blob_url(self._api_base, blob_id) + + async def move_file_to_blob(self, *args, **kwargs): + return await self._storage.move_file_to_blob(*args, **kwargs) + + async def delete_blob(self, *args, **kwargs): + return await self._storage.delete_blob(*args, **kwargs) + + async def get_blob_urls(self, blob_id): + urls = list(await self._storage.get_blob_urls(blob_id)) + # Append the original HTTP URL of the blob, in case we need a fallback. + urls.append(await self._api_blob_url(blob_id)) + return urls + + +class ArtifactoryBlobStorage(BlobStorage): + def __init__(self, api, folder, user, token, url_expiration=3600 * 24 *7): + # Ensure it ends with a / + self._api = f'{api.rstrip("/")}/' + self._folder = f'{folder.rstrip("/")}/' + self._token = token + self._http_client = httpx.AsyncClient( + follow_redirects=True, + ) + self._user = user + self._url_expiration = url_expiration + + async def _get_blob_path(self, blob_id): + path = functools.reduce( + urljoin, + ( + f'{urllib.parse.quote(x)}/' + for x in blob_path_compos(blob_id)[:-1] + ) + ) + path = urljoin(path, blob_id) + return urljoin(self._folder, path) + + async def _get_blob_url(self, blob_id): + path = await self._get_blob_path(blob_id) + url = urljoin(self._api, path) + return url + + async def move_file_to_blob(self, path, blob_id): + path = Path(path) + url = await self._get_blob_url(blob_id) + async with aiofiles.open(str(path), 'rb') as f: + + async def upload_bytes(): + while True: + # If we send chunks that are too big, we will timeout so + # keep it reasonable + x = await f.read(1 * 1024 * 1024) + if x: + yield x + else: + return + + response = self._http_client.stream( + 'PUT', + url, + headers={ + 'X-JFrog-Art-Api': self._token, + }, + data=upload_bytes(), + ) + async with response as response: + await response.aread() + response.raise_for_status() + + async def delete_blob(self, blob_id): + url = await self._get_blob_url(blob_id) + response = await self._http_client.delete( + url, + headers={ + 'X-JFrog-Art-Api': self._token, + }, + ) + response.raise_for_status() + + async def get_blob_urls(self, blob_id): + # URL signing requires extra permissions and may not be needed + # depending if the artifactory repo is made readable for all users + + # path = await self._get_blob_path(blob_id) + # response = await self._http_client.post( + # f'{self._api}api/signed/url', + # json={ + # # Despite the name, this is a path to the artifact itself. + # 'repo_path': path, + # # Despite the name, the unit is milliseconds + # 'valid_for_secs': self._url_expiration * 1000, + # }, + # headers={ + # 'X-JFrog-Art-Api': self._token, + # }, + # ) + # response.raise_for_status() + # signed_url = response.json() + # assert isinstance(signed_url, str) + # return [signed_url] + + url = await self._get_blob_url(blob_id) + url = urllib.parse.urlparse(url) + user = urllib.parse.quote(self._user) + token = urllib.parse.quote(self._token) + # FIXME: TODO: this forges a URL with the user setup for the server, we + # may not want the users to use that + url = url._replace( + netloc=f'{user}:{token}@{url.netloc}', + ).geturl() + return [url] diff --git a/tools/lisa-trace-server/src/lisatraceserver/main.py b/tools/lisa-trace-server/src/lisatraceserver/main.py new file mode 100644 index 000000000..b1db12a5e --- /dev/null +++ b/tools/lisa-trace-server/src/lisatraceserver/main.py @@ -0,0 +1,821 @@ +#! /usr/bin/env python3 +# SPDX-License-Identifier: Apache-2.0 +# +# Copyright (C) 2024, ARM Limited and contributors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import functools +from functools import partial +import tempfile +from pathlib import Path +import argparse +import logging +import json +from typing import Annotated +from contextlib import asynccontextmanager, ExitStack, AsyncExitStack +import shutil +from typing import Optional, Dict +from urllib.parse import urlparse +import os +import ipaddress +import sys +import subprocess +import shlex +import pickle +import threading + +from fastapi import FastAPI, Depends, Request, UploadFile, Form, Header, Response, HTTPException, Query, BackgroundTasks +from fastapi.responses import RedirectResponse, StreamingResponse, JSONResponse +import httpx +import hishel +import sqlalchemy.ext.asyncio +import sqlalchemy.orm +import aiofiles + +# nullcontext() as the latter became async only since Python 3.10 so use the +# devlib one +from devlib.utils.misc import nullcontext + +from lisa.trace import Trace +from lisa.utils import setup_logging + +from lisatraceserver.trace import init_db, ingest_trace, list_traces, get_trace, delete_trace, UnknownTraceError +from lisatraceserver.io import FastApiBaizeFileResponse, download_file, BlobNotFoundError, LocalBlobStorage, ProxyBlobStorage, ArtifactoryBlobStorage +from lisatraceserver.utils import async_lru_cache, api_blob_url + + +def with_tls_cm(app, make_app_cm, make_request_cm=None): + make_request_cm = make_request_cm or nullcontext + + stack = AsyncExitStack() + + @app.on_event("startup") + async def startup(): + await stack.__aenter__() + + @app.on_event("shutdown") + async def shutdown(): + await stack.__aexit__(None, None, None) + + tls = threading.local() + + async def get(): + try: + return tls.value + except AttributeError: + x = await stack.enter_async_context(make_app_cm()) + tls.value = x + return x + + async def cm(): + x = await get() + async with make_request_cm(x) as x: + yield x + + return Depends(cm) + + +def parse_tags(tags: Optional[str]) -> Dict[str, str]: + if tags is None: + tags = {} + else: + try: + tags = json.loads(tags) + except ValueError as e: + raise HTTPException( + status_code=422, + detail="Could not load the tags as JSON: {e}", + ) + + if not (isinstance(tags, dict) and all(isinstance(v, str) for v in tags.values())): + raise HTTPException( + status_code=422, + detail="Tags must be a JSON objects with string values", + ) + return tags + + +def is_local_request(request): + client_ip = ipaddress.ip_address( + request.client.host + ) + return client_ip.is_loopback + + +async def serve_from_storage(request, storage, file_id, serve_from_url=None): + + @async_lru_cache(maxsize=None) + async def _get_blob_urls(storage, file_id): + return await storage.get_blob_urls(file_id) + + async def get_blob_url(storage, file_id, schemes): + urls = await _get_blob_urls(storage, file_id) + urls = [ + url + for url in urls + if urlparse(url).scheme in schemes + ] + + try: + url, *_ = urls + except ValueError: + raise BlobNotFoundError(file_id) + else: + return url + + async def serve_file(url, headers=None): + parsed = urlparse(url) + assert parsed.scheme == 'file' + return FastApiBaizeFileResponse( + filepath=str(parsed.path), + headers=headers, + chunk_size=4 * 1024 * 1024, + ) + + async def from_local(): + is_local = is_local_request(request) + # Serve a file directly for local clients, so we can avoid the cost + # of the network stack. + # We can only do that if the client explicitly allows redirections, + # so that we don't accidentally starts redirecting a URL that + # polars is actively using (polars does not support redirections, + # and it would add latency anyway and redirecting to file:// would + # probably not work anyway) + if is_local: + url = await get_blob_url(storage, file_id, schemes=['file']) + path = Path(urlparse(url).path).resolve() + headers = { + 'X-Lisa-Local-Path': f'file://{path}', + # We want the client to know that they can expect that file path to + # stay the same for the forseeable future, it's not just a + # temporary redirection. + 'X-Lisa-Location-Info': 'permanent', + } + return await serve_file(url, headers=headers) + else: + raise BlobNotFoundError(file_id) + + try: + return await from_local() + except BlobNotFoundError: + # First try to simply redirect the user so that it download the large + # content directly from the blob storage. + try: + url = await get_blob_url(storage, file_id, schemes=['http', 'https']) + except BlobNotFoundError: + # If all we have is a local file, we serve it ourselves. + url = await get_blob_url(storage, file_id, schemes=['file']) + return await serve_file(url) + else: + return ( + (await serve_from_url(url)) + if serve_from_url else + RedirectResponse(url) + ) + + +async def finish_or_spawn(aw, timeout): + task = asyncio.create_task(aw) + done, pending = await asyncio.wait( + ( + task, + asyncio.create_task( + asyncio.sleep(timeout), + ) + ), + return_when=asyncio.FIRST_COMPLETED, + ) + return (task in done, task) + + +def header_setter(setter): + def decorator(f): + @functools.wraps(f) + async def wrapper(*args, **kwargs): + response = await f(*args, **kwargs) + assert isinstance(response, Response) + response.headers.update(dict(setter(response))) + return response + return wrapper + return decorator + + +def forbid_cache(): + @header_setter + def setter(response): + return { + 'Cache-Control': 'no-store', + } + return setter + + +def immutable(): + @header_setter + def setter(response): + return { + 'Cache-Control': 'public, immutable, max-age=604800', + } + return setter + + +def get_route(app, *args, **kwargs): + def decorator(f): + f = app.head(*args, **kwargs)(f) + f = app.get(*args, **kwargs)(f) + return f + return decorator + + +def make_run_app(settings): + settings.storage = settings.make_storage() + app = FastAPI(debug=settings.debug) + + @asynccontextmanager + async def get_db(): + db_url = settings.db + + engine = sqlalchemy.ext.asyncio.create_async_engine(db_url) + await init_db(engine) + + SessionLocal = sqlalchemy.orm.sessionmaker( + class_=sqlalchemy.ext.asyncio.AsyncSession, + autocommit=False, + autoflush=False, + bind=engine, + ) + session = None + try: + async with SessionLocal() as session: + yield session + finally: + if session is not None: + await session.close() + + @asynccontextmanager + async def reset_session(session): + try: + yield session + finally: + try: + await session.commit() + finally: + # Reset the session at the end of the request to ensure we don't + # end up e.g. with a locked SQLite database if a DML statement + # failed. + await session.reset() + + WithDBSession = Annotated[ + sqlalchemy.orm.Session, + with_tls_cm(app, get_db, reset_session) + ] + + @app.delete( + '/v1/traces/by/id/{trace_id}', + tags=['traces'], + ) + @forbid_cache() + async def trace_delete( + trace_id: str, + request: Request, + session: WithDBSession, + ): + """ + Remove the trace with the given ID. + """ + + storage = settings.storage + await delete_trace( + session=session, + storage=storage, + trace_id=trace_id, + ) + return Response(status_code=200) + + @app.post( + '/v1/traces/upload', + tags=['traces'], + response_description="The trace ID of the uploaded trace", + ) + @forbid_cache() + async def trace_upload( + session: WithDBSession, + background_tasks: BackgroundTasks, + file: UploadFile, + tags: Annotated[ + Optional[str], + Form( + description='Tags as a JSON object with string values', + max_length=1024 * 1024 + ), + ]=None, + ): + """ + Upload a trace with the given tags. + """ + + storage = settings.storage + scratch = settings.scratch + + tags = parse_tags(tags) + + async def ingest(path, temp): + try: + trace_id = await ingest_trace( + session=session, + storage=storage, + temp=temp, + path=path, + tags=tags, + ) + except Exception as e: + raise HTTPException( + status_code=422, + detail=f"Could not upload the trace: {e}", + ) + else: + return { + 'trace_id': trace_id, + } + + with tempfile.TemporaryDirectory(dir=scratch) as temp: + temp = Path(temp) + path = temp / 'trace.dat' + + with open(path, 'wb') as f: + await asyncio.to_thread( + lambda: shutil.copyfileobj(file.file, f) + ) + + kwargs = dict( + path=path, + temp=temp, + ) + + data = await ingest(**kwargs) + return JSONResponse(data) + + @get_route( + app, + '/v1/traces/by/id/{trace_id}/infos', + tags=['traces'], + response_description="The information related to the given trace ID", + ) + @immutable() + async def trace_by_id(trace_id: str, session: WithDBSession, metadata: Optional[str]=None): + """ + Get information pertaining to the given trace ID. + """ + if metadata is not None: + metadata = [ + item.strip() + for item in metadata.split(',') + ] + + data = await get_trace( + session=session, + trace_id=trace_id, + metadata=metadata + ) + return JSONResponse(data) + + + @get_route( + app, + '/v1/traces/by/tags', + tags=['traces'], + response_description="The list of trace IDs of traces with tags matching the provided tags", + ) + @forbid_cache() + async def list_traces_by_tags(request: Request, session: WithDBSession): + """ + Get list of trace IDs matching the tags provided in the GET request + parameters. + """ + tags = request.query_params + data = await list_traces(session, tags) + return JSONResponse(data) + + @get_route( + app, + '/v1/files/by/id/{file_id}/content', + tags=['files'], + response_description="The binary file content associated with the given file ID", + ) + @immutable() + async def read_files( + file_id: str, + request: Request, + ): + """ + Get the file with given ID. + + HTTP Range requests are accepted. + """ + storage = settings.storage + return await serve_from_storage( + request=request, + storage=storage, + file_id=file_id, + ) + + return app + + +def make_proxy_app(settings): + app = FastAPI(debug=settings.debug) + logger = logging.getLogger("proxy") + + api_base = settings.api_base + api_base = api_base._replace( + path=api_base.path.strip('/') + ).geturl() + api_base_netloc = urlparse(api_base).netloc + + storage = ProxyBlobStorage( + api_base=api_base, + storage=settings.make_storage(), + ) + scratch = settings.scratch + + # Ensure everything uses updated settings variables + del settings + + WithCachingHTTPClient = Annotated[ + hishel.AsyncCacheClient, + with_tls_cm( + app, + lambda: hishel.AsyncCacheClient( + follow_redirects=True, + storage=hishel.AsyncInMemoryStorage( + # Store up to that many requests + capacity=256, + ) + ) + ) + ] + WithHTTPClient = Annotated[ + hishel.AsyncCacheClient, + with_tls_cm( + app, + lambda: httpx.AsyncClient( + follow_redirects=True, + ) + ) + ] + + in_flight = set() + + async def fetch_file_to_storage(http_client, storage, url, file_id): + if file_id not in in_flight: + in_flight.add(file_id) + try: + # Download to a temp file name and then atomically move to the real + # name once the file is complete. + with tempfile.NamedTemporaryFile(dir=scratch) as f: + path = Path(f.name).resolve() + await download_file(http_client, url, path) + await storage.move_file_to_blob(path, file_id) + finally: + try: + in_flight.remove(file_id) + except KeyError: + pass + + @get_route( + app, + '/v1/files/by/id/{file_id}/content', + tags=['files'], + response_description="The binary file content associated with the given file ID", + ) + @immutable() + async def read_files( + file_id: str, + background_tasks: BackgroundTasks, + request: Request, + http_client: WithHTTPClient, + ): + """ + Get the file with given ID. + + HTTP Range requests are accepted. + """ + async def serve_from_url(url): + async with AsyncExitStack() as stack: + response = http_client.stream( + request.method.upper(), + url, + headers=request.headers, + cookies=request.cookies, + ) + r = await stack.enter_async_context(response) + + new_stack = stack.pop_all() + async def stream_content(): + async with new_stack: + async for x in r.aiter_raw(): + yield x + + headers = dict(r.headers) + + # We want to let the client know that in the close future, we + # will be able to serve the content from another location, so + # it should not cache the current location (file://) for too + # long. If the client is not local, the location will stay the + # same and we will simply serve the content from a local file, + # but that's an implementation detail. + if is_local_request(request): + headers.update({ + 'X-Lisa-Location-Info': 'temporary', + }) + + return StreamingResponse( + status_code=r.status_code, + headers=headers, + content=stream_content(), + ) + + async def download_and_serve(file_id): + logger.info(f'File {file_id} not available locally, fetching') + url = api_blob_url(api_base, file_id) + fetch = fetch_file_to_storage( + http_client=http_client, + storage=storage, + url=url, + file_id=file_id, + ) + # Give a chance to complete the download before we answer the + # request so we can serve a file:// URL + if is_local_request(request): + finished, _ = await finish_or_spawn( + fetch, + timeout=100e-3, + ) + # Otherwise we will start the download after we are done handling + # this request. + else: + async def task_f(): + return await fetch + background_tasks.add_task(task_f) + finished = False + + if finished: + return await _serve_from_storage(file_id) + else: + return await serve_from_url(url) + + async def _serve_from_storage(file_id): + response = await serve_from_storage( + request=request, + storage=storage, + file_id=file_id, + serve_from_url=serve_from_url + ) + if response.status_code == 404: + raise BlobNotFoundError(file_id) + else: + return response + + try: + return await _serve_from_storage(file_id) + except BlobNotFoundError: + return await download_and_serve(file_id) + + @app.api_route( + '/{path:path}', + methods=['HEAD', 'GET', 'POST', 'PUT', 'DELETE', 'CONNECT', 'OPTIONS', 'TRACE', 'PATCH'] + ) + async def default( + request: Request, + http_client: WithCachingHTTPClient, + ): + url = str(request.url.replace(netloc=api_base_netloc)) + + # We simply make an HTTP request to the main server, and let hishel + # handle the caching for us, based on the Cache-Control headers that + # the main server advertises. This way the proxy will always match what + # the server intends without having to update the code or have matching + # versions. + new_request = http_client.stream( + method=request.method, + url=url, + headers=request.headers, + cookies=request.cookies, + data=request.stream(), + ) + + async with AsyncExitStack() as stack: + response = await stack.enter_async_context(new_request) + + new_stack = stack.pop_all() + async def stream_content(): + async with new_stack: + async for x in response.aiter_raw(): + yield x + + return StreamingResponse( + status_code=response.status_code, + headers=response.headers, + content=stream_content(), + ) + return response + + + return app + + +def update_app(app, settings): + if (latency := settings.simulate_latency): + @app.middleware("http") + async def simulate_network_latency(request: Request, call_next): + await asyncio.sleep(latency) + return await call_next(request) + + @get_route( + app, + '/v1/ping', + tags=['misc'], + response_description="Empty response", + ) + @forbid_cache() + async def ping(): + return Response( + status_code=200, + ) + + if settings.debug: + @app.exception_handler(Exception) + async def handler(request: Request, exc: UnknownTraceError): + import pdb + pdb.post_mortem(traceback=exc.__traceback__) + raise exc + else: + @app.exception_handler(UnknownTraceError) + async def UnknownTraceError_handler(request: Request, exc: UnknownTraceError): + return JSONResponse( + status_code=404, + content={'detail': f'Trace ID {exc.trace_id} not found'}, + ) + + @app.exception_handler(BlobNotFoundError) + async def BlobNotFoundError_handler(request: Request, exc: BlobNotFoundError): + return JSONResponse( + status_code=404, + content={'detail': str(exc)}, + ) + + return app + + +def parse_args(args): + import argparse + setup_logging(level=logging.DEBUG) + + parser = argparse.ArgumentParser(description='LISA trace server') + subparsers = parser.add_subparsers( + help='sub-command help', + dest='command', + required=True, + ) + + def add_common(parser): + parser.add_argument('--backend', default='gunicorn', choices=['gunicorn', 'uvicorn'], help='ASGI server backend to use for the server', type=str) + parser.add_argument('--host', default='0.0.0.0', help='Host address to bind to', type=str) + parser.add_argument('--port', default=80, help='TCP port to bind to', type=int) + + + parser.add_argument('--storage', required=True, help='Storage backend used', choices=['artifactory', 'files']) + parser.add_argument('--scratch', help='Path to scratch area', type=Path) + parser.add_argument('--simulate-latency', help='Simulate network latency', type=float) + parser.add_argument('--debug', default=False, help='Enable debugging feature, not for production', action='store_true') + + parser.add_argument('--storage-opt', help='Storage-specific options', action='append', default=[]) + + parser_run = subparsers.add_parser('run', help='Main server mode') + parser_run.add_argument('--db', required=True, help='URL to database') + add_common(parser_run) + + parser_proxy = subparsers.add_parser('run-proxy', help='Proxy server mode') + parser_proxy.add_argument('api_base', help='URL to main server API', type=urlparse) + add_common(parser_proxy) + + args = parser.parse_args(args) + + args.make_storage = make_storage(parser, args) + return args + + +def make_storage(parser, settings): + backend = settings.storage + + def parse_opt(opt): + try: + return opt.split('=', 1) + except ValueError: + parser.error(f'--storage-opts format is =, invalid format: {opt}') + + class OptDict(dict): + def __missing__(self, key): + parser.error(f'Missing --storage-opt "{key}=..." for {backend} storage backend') + + opts = OptDict(map(parse_opt, settings.storage_opt)) + if backend == 'artifactory': + make = partial( + ArtifactoryBlobStorage, + folder=opts['folder'], + api=opts['api'], + token=opts['token'], + user=opts['user'], + ) + elif backend == 'files': + make = partial(LocalBlobStorage, opts['root']) + else: + parser.error(f'Unknown storage backend: {backend}') + + return make + + +def make_app(args): + cmd = args.command + settings = args + + if cmd == 'run': + app = make_run_app(settings) + elif cmd == 'run-proxy': + app = make_proxy_app(settings) + else: + parser.error(f'Unknown command {cmd}') + + app = update_app(app, settings) + return app + + +def run_uvicorn(args): + import uvicorn + + app = make_app(args) + uvicorn.run( + app, + interface="asgi3", + port=args.port, + host=args.host, + ) + + +def run_gunicorn(args): + import base64 + from uvicorn.workers import UvicornWorker + + # Check that gunicorn is installed + import gunicorn + + host = f'{args.host}:{args.port}' + worker_cls = f'{UvicornWorker.__module__}.{UvicornWorker.__qualname__}' + nr_workers = min( + len(os.sched_getaffinity(0)), + 12, + ) + + cli = [ + sys.executable, + '-m', 'gunicorn', + '--bind', host, + '--workers', nr_workers, + '--worker-class', worker_cls, + 'lisatraceserver._gunicorn:app', + ] + + cmd = map(shlex.quote, map(str, cli)) + subprocess.run( + cmd, + env={ + **os.environ, + '__LISA_TRACE_SERVER_PICKLED_APP_ARGS': base64.b85encode( + pickle.dumps(args) + ) + } + ) + + +def main(): + args = parse_args(sys.argv[1:]) + backend = args.backend + + if backend == 'uvicorn': + run_uvicorn(args) + elif backend == 'gunicorn': + run_gunicorn(args) + else: + raise ValueError(f'Unknown backend "{backend}"') + diff --git a/tools/lisa-trace-server/src/lisatraceserver/trace.py b/tools/lisa-trace-server/src/lisatraceserver/trace.py new file mode 100644 index 000000000..3b8f10df1 --- /dev/null +++ b/tools/lisa-trace-server/src/lisatraceserver/trace.py @@ -0,0 +1,451 @@ +#! /usr/bin/env python3 +# SPDX-License-Identifier: Apache-2.0 +# +# Copyright (C) 2024, ARM Limited and contributors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import uuid +from itertools import starmap, count, chain +from functools import partial +import asyncio +import tempfile +from pathlib import Path +import argparse +import sqlite3 +import logging +import json +import shutil +import datetime +from types import SimpleNamespace +from contextlib import AsyncExitStack + +import polars as pl +import sqlalchemy +from sqlalchemy.sql.expression import select, insert, delete, column, intersect, text, update +from sqlalchemy.exc import NoResultFound + +from lisa.trace import Trace, TraceParserBase, _Trace +from lisa.utils import setup_logging, measure_time +from lisa.datautils import _reencode_parquet +import lisa.version + + +def make_db_meta(): + from sqlalchemy.ext.declarative import declarative_base + from sqlalchemy import Column + + Base = declarative_base() + + class Traces(Base): + __tablename__ = 'traces' + trace_id = Column(sqlalchemy.String, primary_key=True) + trace_file_id = Column(sqlalchemy.String) + trace_creation_date = Column(sqlalchemy.Date) + trace_access_date = Column(sqlalchemy.Date) + lisa_version_token = Column(sqlalchemy.String) + trace_parse_duration = Column(sqlalchemy.Float) + + + class TracesTags(Base): + __tablename__ = 'traces_tags' + trace_id = Column(sqlalchemy.String, primary_key=True) + tag_id = Column(sqlalchemy.String, primary_key=True) + tag_value = Column(sqlalchemy.String) + + class TracesMetadata(Base): + __tablename__ = 'traces_metadata' + trace_id = Column(sqlalchemy.String, primary_key=True) + key = Column(sqlalchemy.String, primary_key=True) + value = Column(sqlalchemy.String) + + class TracesEventsParquet(Base): + __tablename__ = 'traces_events_parquet' + trace_id = Column(sqlalchemy.String, primary_key=True) + event = Column(sqlalchemy.String, primary_key=True) + parquet_file_id = Column(sqlalchemy.String) + + ns = SimpleNamespace(**{ + cls.__tablename__: cls + for cls in Base.__subclasses__() + }) + + return (ns, Base.metadata) + +TABLES, DB_META = make_db_meta() + +async def init_db(engine): + async with engine.begin() as session: + await session.run_sync(DB_META.create_all) + + +# If we use lower values, we risk tanking the performances by having I/O done +# in really small chunks +MIN_ROW_GROUP_SIZE = 64 * 1024 + +# Maximum size of the dataframe to collection in memory +MEM_THRESHOLD = 512 * 1024 * 1024 + +async def upload_df(session, storage, temp, df): + with tempfile.NamedTemporaryFile(dir=temp, suffix='.parquet') as f: + path = Path(f.name) + + ldf = df.df + meta = df.meta + + try: + nr_rows = df.meta['nr_rows'] + except KeyError: + row_group_size = None + else: + row_group_size = max( + # Avoid having too many row groups, as the per-group metadata + # will be loaded eagerly by polars. This can become quite + # massive in very large files with very high number of row + # groups. + nr_rows // 100, + MIN_ROW_GROUP_SIZE, + ) + + kwargs = dict( + compression='zstd', + statistics=True, + row_group_size=row_group_size, + ) + + async def sink(ldf): + def write(): + # Override the compression to something fast to reload, as we + # are going to re-encode the file anyway. + _kwargs = { + **kwargs, + 'compression': 'lz4', + 'statistics': False, + } + ldf.sink_parquet(path, **_kwargs) + # Re-encode the parquet file with the correct row_group_size, + # as sink_parquet() currently ignores that parameter: + # https://github.com/pola-rs/polars/issues/13092 + _reencode_parquet( + path, + page_index=False, + **kwargs, + ) + + await asyncio.to_thread(write) + + async def collect_write(ldf): + with pl.StringCache(): + df = await ldf.collect_async() + + def write(): + df.write_parquet( + path, + use_pyarrow=True, + pyarrow_options=dict( + # Currently polars does not use the page index anyway, + # so save some space: + # https://github.com/pola-rs/polars/issues/12752 + write_page_index=False, + ), + **kwargs + ) + return await asyncio.to_thread(write) + + try: + await sink(ldf) + except Exception as e: + logging.error(f'Error while sinking LazyFrame to parquet file: {e}') + await collect_write(ldf) + + return await storage.move_file(path) + + +MAX_STR_LEN = 4096 + +async def ingest_trace(session, storage, temp, path, tags): + with await asyncio.to_thread(Trace, path) as trace: + trace_id = trace.get_metadata('trace-id') + + mandatory_tags = { + 'description', + 'author-username', + 'author-team', + 'creation-date', + } + missing_tags = mandatory_tags - tags.keys() + + if missing_tags: + raise ValueError(f'Some mandatory tags are missing: {", ".join(sorted(missing_tags))}') + else: + tags = { + k: str(v) + for k, v in tags.items() + } + for k, v in tags.items(): + if len(k) >= MAX_STR_LEN or len(v) >= MAX_STR_LEN: + raise ValueError(f'Tag "{k}" name or value is longer than {MAX_STR_LEN}') + + + creation_date = datetime.datetime.fromisoformat(tags['creation-date']) + # Ensure we have a clean ISO 8601 tag value + tags['creation-date'] = creation_date.isoformat() + + async with AsyncExitStack() as stack: + await stack.enter_async_context(session.begin_nested()) + + exists = await session.stream_scalars( + select(TABLES.traces.trace_id).filter_by(trace_id=trace_id) + ) + exists = await exists.one_or_none() is not None + + if not exists: + stack = await _do_ingest_trace( + stack=stack, + session=session, + storage=storage, + temp=temp, + trace=trace, + trace_id=trace_id, + creation_date=creation_date, + ) + + # Re-entering the stack is a no-op, but that ensures we + # __exit__ the stack created by _do_ingest_trace() + async with stack: + await session.execute( + delete(TABLES.traces_tags).where(column('trace_id') == trace_id) + ) + await session.execute( + insert(TABLES.traces_tags).values([ + dict( + trace_id=trace_id, + tag_id=tag_id, + tag_value=tag_value, + ) + for tag_id, tag_value in tags.items() + ]) + ) + + return trace_id + + +async def _do_ingest_trace(stack, session, storage, temp, trace, trace_id, creation_date): + def parse(): + with measure_time() as m: + res = trace._parse_all() + return (res, m.delta) + + # Close the existint transaction while we parse, otherwise we hold the DB + # locked for too long. + await stack.pop_all().aclose() + (meta, df_map), duration = await asyncio.to_thread(parse) + + df_map = dict(zip( + df_map.keys(), + await asyncio.gather( + *map( + lambda df: upload_df( + session=session, + storage=storage, + temp=temp, + df=df + ), + df_map.values(), + ) + ) + )) + + trace_file_id = await storage.move_file( + trace.trace_path, + ) + + async with AsyncExitStack() as stack: + await stack.enter_async_context(session.begin_nested()) + + await session.execute( + insert(TABLES.traces).values( + trace_id=trace_id, + trace_file_id=trace_file_id, + trace_creation_date=creation_date, + lisa_version_token=lisa.version.VERSION_TOKEN, + trace_parse_duration=duration, + ) + ) + + await session.execute( + insert(TABLES.traces_metadata).values([ + dict( + trace_id=trace_id, + key=key, + value=json.dumps(value), + ) + for key, value in _Trace._meta_to_json(meta).items() + ]) + ) + + await session.execute( + insert(TABLES.traces_events_parquet).values([ + dict( + trace_id=trace_id, + event=event, + parquet_file_id=parquet_file_id + ) + for event, parquet_file_id in df_map.items() + ]) + ) + return stack.pop_all() + + +async def delete_trace(session, storage, trace_id): + async with session.begin_nested(): + trace_file_id = ( + await session.stream_scalars( + select(TABLES.traces.trace_file_id).filter_by(trace_id=trace_id) + ) + ) + + try: + trace_file_id = await trace_file_id.one() + except NoResultFound: + raise UnknownTraceError(trace_id) + else: + parquet_file_ids = dict( + await ( + await session.stream( + select( + TABLES.traces_events_parquet.parquet_file_id, + TABLES.traces_events_parquet.event, + ).filter_by(trace_id=trace_id) + ) + ).fetchall() + ) + for _table in (TABLES.traces, TABLES.traces_tags, TABLES.traces_metadata, TABLES.traces_events_parquet): + await session.execute( + delete(_table).where(column('trace_id') == trace_id) + ) + + blob_ids = [ + *parquet_file_ids.keys(), + trace_file_id + ] + + # Delete the files before finishing the DB transaction, so we can + # get their ID again if the server gets interrupted half-way + # through. This would not be ideal as some of the files might have + # been deleted already, but better than leaking storage. + await asyncio.gather(*( + storage.delete_blob(blob_id) + for blob_id in blob_ids + )) + + +class UnknownTraceError(Exception): + def __init__(self, trace_id): + super().__init__(trace_id) + self.trace_id = trace_id + + +async def get_trace(session, trace_id, metadata=None): + trace_file_id = ( + await session.stream_scalars( + select(TABLES.traces.trace_file_id).filter_by(trace_id=trace_id) + ) + ) + try: + trace_file_id = await trace_file_id.one() + except NoResultFound: + raise UnknownTraceError(trace_id) + else: + assert trace_file_id is not None + await session.stream( + update(TABLES.traces) + .where( + TABLES.traces.trace_id == trace_id + ).values( + trace_access_date=datetime.datetime.now(datetime.timezone.utc) + ) + ) + + parquet_file_ids = dict( + await ( + await session.stream( + select( + TABLES.traces_events_parquet.event, + TABLES.traces_events_parquet.parquet_file_id, + ).filter_by(trace_id=trace_id) + ) + ).fetchall() + ) + + keys = TraceParserBase.METADATA_KEYS if metadata is None else metadata + meta = dict( + await ( + await session.stream( + select( + TABLES.traces_metadata.key, + TABLES.traces_metadata.value + ).filter( + TABLES.traces_metadata.trace_id == trace_id, + TABLES.traces_metadata.key.in_(keys), + ) + ) + ).fetchall() + ) + meta = { + k: json.loads(v) + for k, v in meta.items() + } + + tags = dict( + await ( + await session.stream( + select( + TABLES.traces_tags.tag_id, + TABLES.traces_tags.tag_value, + ).filter_by(trace_id=trace_id) + ) + ).fetchall() + ) + + return { + 'events': { + event: { + 'file_id': file_id, + } + for event, file_id in parquet_file_ids.items() + }, + 'file_id': trace_file_id, + 'tags': tags, + 'metadata': meta, + } + + +async def list_traces(session, tags): + if tags: + query = intersect(*( + select(text('trace_id FROM traces_tags')).where( + (column('tag_id') == tag_id) & + (column('tag_value') == tag_value) + ) + for tag_id, tag_value in tags.items() + )) + else: + query = select(TABLES.traces.trace_id) + + return list( + await ( + await session.stream_scalars(query) + ).fetchall() + ) + diff --git a/tools/lisa-trace-server/src/lisatraceserver/utils.py b/tools/lisa-trace-server/src/lisatraceserver/utils.py new file mode 100644 index 000000000..259e0dfc2 --- /dev/null +++ b/tools/lisa-trace-server/src/lisatraceserver/utils.py @@ -0,0 +1,71 @@ +#! /usr/bin/env python3 +# SPDX-License-Identifier: Apache-2.0 +# +# Copyright (C) 2024, ARM Limited and contributors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import functools + + +class _MemoizedFuture: + _NOT_SET = object() + def __init__(self, future): + self.future = future + self._return = self._NOT_SET + self._excep = self._NOT_SET + + def __await__(self): + if self._return is not self._NOT_SET: + return self._return + elif self._excep is not self._NOT_SET: + raise self._excep + else: + try: + x = yield from self.future.__await__() + except Exception as e: + self._excep = e + raise e + else: + self._return = x + return x + + +def async_lru_cache(**kwargs): + def decorator(f): + @functools.lru_cache(**kwargs) + def _wrapper(*_args, **_kwargs): + future = f(*_args, **_kwargs) + return _MemoizedFuture(future) + + # Ensure we have a layer of coroutine function so that metaprogramming + # relying on that does not get confused + @functools.wraps(f) + async def wrapper(*_args, **_kwargs): + return await _wrapper(*_args, *_kwargs) + + return wrapper + return decorator + + +def api_blob_url(api_base, blob_id): + return f'{api_base}/v1/files/by/id/{blob_id}/content' + +def blob_path_compos(blob_id): + folder = [ + x + for x, _ in zip(blob_id, range(5)) + ] + return [*folder, 'blobs', blob_id] + + -- GitLab