diff --git a/.gitignore b/.gitignore index 247d293fb98383577a452e9474af9e3113a76580..8e702ae621c2b35f69de6e2a1dccfd2dcb5db3f2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,9 @@ **/*.so **/*.o .gitreview +**/*.json +**/*.log +**/.netrc +**/*.pyc +**/__pycache__ +**/.netrc diff --git a/coverage-tool/coverage-reporting/lcov_parser.py b/coverage-tool/coverage-reporting/lcov_parser.py new file mode 100644 index 0000000000000000000000000000000000000000..8fdec2a93c0dfd9552a4bec7372724b4d919424a --- /dev/null +++ b/coverage-tool/coverage-reporting/lcov_parser.py @@ -0,0 +1,243 @@ +import argparse +import time +from enum import Enum +from typing import Dict, List, Any + +import cc_logger +import os +import json +import requests as requests +from parsel import Selector + + +class Metrics(Enum): + LINES = 1 + FUNCTIONS = 2 + BRANCHES = 3 + FILES = 4 + + @staticmethod + def like(s: str): + s = s or '' + for m in Metrics: + if m.name.startswith(s.strip().upper()): + return m + return None + + +logger = cc_logger.logger + + +def to_(f, s, pos=0, default=None): + """ + Function to return a conversion from string to a type given by function f + + :param f: Function used to convert the string + :param s: String to be converted + :param pos: The string is split and this is the position within the + resulting array where resides the string + :param default: Default value if conversion cannot be made + :return: Converted string value + """ + r = None + try: + r = f(s.split()[pos]) + except (ValueError, IndexError): + if default is not None: + return default + return r + + +class ParseCodeCoverageHTMLReport(object): + """ + Class used to scrape information from a LCOV report to be written to a + JSON file in a flat structure to be read and uploaded to a custom DB + """ + + def __init__(self, args): + self.args = args + self.ci_url = args.ci_url + self.lcov_path = args.lcov_path + self.url = f'{self.ci_url}{self.lcov_path}' + self.ci_type = args.ci_type + self.json_file = args.json_file + self.report = None + + def get(self): + logger.info(f'Collecting from {self.url}...') + self.report = ParseCodeCoverageHTMLReport.process(self.url) + if not self.report: + return None + _metadata = self._metadata() + self.report['metadata'].update(_metadata) + return self.report + + def save_json(self, report=None): + if report is not None: + self.report = report + if self.report is None: + self.report = self.get() + if self.report: + with open(self.json_file, 'w', encoding='utf-8') as f: + json.dump(self.report, f, ensure_ascii=False, indent=4) + return True + return False + + def _metadata(self): + metadata = {'uri': self.ci_url, 'ci_type': self.ci_type} + if self.args.metadata: + metadata.update(json.loads(self.args.metadata)) + return metadata + + FIRST_LEVEL = True + LCOV_VERSION = "1.15" + + @staticmethod + def process(url, parent=""): + """ + Static method used to extract the summary and table information from + the LCOV report deployed at the given url + + :param url: URL where the LCOV report resides + :param parent: Parent folder for the LCOV report. Empty if at the + first/root level + :return: List containing dictionaries for every file with the + corresponding metrics/results + """ + + def _metadata() -> {}: + date_time = selector. \ + xpath("//td[contains(@class, 'headerItem') and text() = " + "'Date:']/following-sibling::td[1 and contains(" + "@class, 'headerValue')]/text()").get() + lcov_version = selector. \ + xpath("//td[contains(@class, 'versionInfo')]/a/text()").get() + metadata = {'datetime': date_time, + 'lcov_version': lcov_version.split()[-1], + 'root_url_report': url} + return metadata + + def _summary() -> [{}]: + summary = {"Directory": "", "Parent": parent} + result_cols = selector. \ + xpath('//td[@class="headerCovTableHead"]/text()').getall() + for metric in Metrics: + metric_sel = selector. \ + xpath(f"//td[contains(@class, 'headerItem') " + f"and text() = '{metric.name.title()}:']") + if not metric_sel: + continue + results = metric_sel.xpath( + "./following-sibling::td[1 and contains" + "(@class, 'headerCovTableEntry')]/text()").getall() + for index, result_col in enumerate(result_cols): + summary[f'{metric.name.title()}{result_col}'] = \ + to_(float, results[index], default=-1) + return [summary] + + def _table() -> [{}]: + table = [] + arr = {} + headers = selector. \ + xpath('//td[@class="tableHead"]/text()').getall() + sub_headers = [j for i in headers if (j := i.title().strip()) in [ + 'Total', 'Hit']] + file_type = headers[0].strip() + metric_headers = [metric.name.title() for h in headers + if (metric := Metrics.like(h.split()[0]))] + rows = selector.xpath("//td[contains(@class, 'coverFile')]") + for row in rows: + record = {file_type: row.xpath("./a/text()").get(), + 'Parent': parent} + percentage = row.xpath( + "./following-sibling::td[1 and " + "contains(@class, 'coverPer')]/text()").getall() + hit_total = [v.root.text or '' for v in + row.xpath("./following-sibling::td[1 and " + "contains(@class, 'coverNum')]")] + for index, header in enumerate(metric_headers): + record[f'{header}Coverage'] = to_(float, percentage[index], + default=-1) + if ParseCodeCoverageHTMLReport.LCOV_VERSION \ + in ["1.14", "1.15", "1.16"]: + arr['Hit'], arr['Total'] = ( + hit_total[index].split("/")) + else: + arr[sub_headers[2 * index]], arr[ + sub_headers[2 * index + 1]], *rest = ( + hit_total[2 * index:]) + record[f'{header}Hit'] = to_(int, arr['Hit'], default=0) + record[f'{header}Total'] = to_(int, arr['Total'], default=0) + table.append(record) + if file_type.upper().strip() == "DIRECTORY": + table += ParseCodeCoverageHTMLReport. \ + process(f'{os.path.dirname(url)}' + f'/{row.xpath("./a/@href").get()}', + parent=record[file_type]) + return table + + url = url + parent = parent + req = requests.get(url) + if req.status_code != 200: + logger.warning(f"Url '{url}' return status code " + f"{req.status_code}, returning without collecting " + f"data...") + return [] + text = req.text + selector = Selector(text=text) + metadata = None + if ParseCodeCoverageHTMLReport.FIRST_LEVEL: + ParseCodeCoverageHTMLReport.FIRST_LEVEL = False + metadata = _metadata() + if 'lcov_version' in metadata: + ParseCodeCoverageHTMLReport.LCOV_VERSION = \ + metadata['lcov_version'] + data: [{}] = _summary() + _table() + if metadata is not None: + ParseCodeCoverageHTMLReport.FIRST_LEVEL = True + return {'metadata': metadata, 'records': data} + else: + return data + + +help = """ +Collects data (metrics and results) from lcov report and write it to a json +file. + +The data might be collected in two levels: +- Directory level +- Filename level +""" + + +def main(): + parser = argparse. \ + ArgumentParser(epilog=help, + formatter_class=argparse.RawTextHelpFormatter) + parser.add_argument('--ci-url', help='CI url path including job', + required=True) + parser.add_argument('--lcov-path', help='LCOV report path', required=True) + parser.add_argument('--ci-type', + help='CI type, either Jenkins (default) or Gitlab', + default='Jenkins', + choices=["Jenkins", "Gitlab"]) + parser.add_argument('--json-file', + help='Path and filename of the output JSON file', + default="data.json") + parser.add_argument("--metadata", + metavar="KEY=VALUE", + nargs='*', + help="Set a number of key-value pairs as metadata " + "If a value contains spaces, you should define " + "it with double quotes: " + 'key="value with ' + 'spaces".') + args = parser.parse_args() + return ParseCodeCoverageHTMLReport(args).save_json() + + +if __name__ == '__main__': + start_time = time.time() + main() + elapsed_time = time.time() - start_time + print("Elapsed time: {}s".format(elapsed_time))