diff --git a/coverage-tool/coverage-reporting/intermediate_layer.py b/coverage-tool/coverage-reporting/intermediate_layer.py index 55474b91eaa9aa10809ee93f8c87c91be3408e5d..d66b6fb4fef91cd6bc711c133efc1afeab0944a7 100644 --- a/coverage-tool/coverage-reporting/intermediate_layer.py +++ b/coverage-tool/coverage-reporting/intermediate_layer.py @@ -29,6 +29,9 @@ from typing import Generator from typing import Union from typing import Tuple import logging +import sys +from collections import defaultdict +from src_lines_finder import get_src_lines_finder __version__ = "7.0" @@ -300,47 +303,6 @@ class BinaryParser(object): end_hex_address, 16)} return _functions - class SourceCodeBlock(object): - """Class used to represent a source code block of information within - a function block in a binary dump file. - The source code block contains the following components: - - Source code file that contains the source code corresponding - to the assembly code. - - Line number within the source code file corresponding to the source - code. - - Assembly code block. - """ - - def __init__(self, source_code_block_dump): - """ - Create an instance of a source code block within a function block. - - :param source_code_block: Tuple of 3 elements that contains the - components of a source code block. - """ - self.source_file, self.line_number, self.asm_code \ - = source_code_block_dump - - @staticmethod - def get(dwarf_data: str) -> Generator['BinaryParser.SourceCodeBlock', - None, None]: - source_block_groups = re.findall(r"(?s)(/[a-zA-Z_0-9][^\n]+?):" - r"([0-9]+)(?: [^\n]+)?\n(.+?)" - r"\n(?=/[a-zA-Z_0-9][^\n]+?" - r":[0-9]+[^\n]+?\n|\n$)", - dwarf_data) - for source_block_group in source_block_groups: - if len(source_block_group) != 3: - logger.warning(f"Source code incomplete:" - f"{source_block_group}") - continue - source_block_dump = list(source_block_group) - source_block_dump[-1] += "\n\n" # For parsing assembly lines - yield BinaryParser.SourceCodeBlock(source_block_dump) - - def __str__(self): - return f"'{self.source_file}:{self.line_number}'" - class FunctionBlock(object): """Class used to parse and obtain a function block from the binary dump file that corresponds to a function declaration in the @@ -445,34 +407,21 @@ class BinaryParser(object): yield BinaryParser.AssemblyLine(line) @staticmethod - def get_asm_line(source_code_block: 'BinaryParser.SourceCodeBlock', + def get_asm_line(dwarf: str, traces_stats) -> \ Generator['BinaryParser.AssemblyLine', None, None]: - """Generator method to obtain all assembly line codes within a source - code line """ + """Generator method to obtain all assembly line codes within a function + block """ traces_stats = traces_stats - for asm_line in BinaryParser.AssemblyLine.get( - source_code_block.asm_code): + for asm_line in BinaryParser.AssemblyLine.get(dwarf): asm_line.times_executed = traces_stats.get(asm_line.dec_address, [0])[0] yield asm_line - def get_source_code_block(self, function_block: FunctionBlock) -> \ - Generator['BinaryParser.SourceCodeBlock', None, None]: - """ - Generator method to obtain all the source code blocks within a - function block. - - :param function_block: FunctionBlock object that contains the code - the source code blocks. - :return: A SourceCodeBlock object. - """ - for source_code_block in BinaryParser.SourceCodeBlock.get( - function_block.dwarf): - if self.remove_workspace: - source_code_block.source_file = remove_workspace( - source_code_block.source_file, self.workspace) - yield source_code_block + def _relativized_path(self, filename): + if self.remove_workspace: + filename = remove_workspace(filename, self.workspace) + return os.path.normpath(filename) def get_function_block(self) -> Generator['BinaryParser.FunctionBlock', None, None]: @@ -521,35 +470,33 @@ class CoverageHandler(object): self._source_files[source_file]["functions"].setdefault( name, {"covered": False, "line_number": function_line_number}) - def add_line_coverage(self, source_code_block: - BinaryParser.SourceCodeBlock): + def add_line_coverage(self, source_file, line_number): """ Add a line coverage block and a source file coverage block, if not already created and link them""" # Add source file coverage block it if not already there - self._source_files.setdefault(source_code_block.source_file, + self._source_files.setdefault(source_file, {"functions": {}, "lines": {}}) # Add a line coverage block (if not existent) from a source block # using the source code line number as a key and link it to the source # file coverage block - self._source_files[source_code_block.source_file]["lines"].setdefault( - source_code_block.line_number, {"covered": False, "elf_index": {}}) + self._source_files[source_file]["lines"].setdefault( + line_number, {"covered": False, "elf_index": {}}) - def add_asm_line(self, source_code_block: BinaryParser.SourceCodeBlock, + def add_asm_line(self, source_file, line_number, asm_line: BinaryParser.AssemblyLine, elf_index: int): """Add an assembly line from the DWARF data linked to a source code line""" - self._source_files[source_code_block.source_file]["lines"][ - source_code_block.line_number]["elf_index"].setdefault( + self._source_files[source_file]["lines"][ + line_number]["elf_index"].setdefault( elf_index, {}) - self._source_files[source_code_block.source_file]["lines"][ - source_code_block.line_number]["elf_index"][ + self._source_files[source_file]["lines"][ + line_number]["elf_index"][ elf_index].setdefault(asm_line.dec_address, (asm_line.opcode, asm_line.times_executed)) - def set_line_coverage(self, source_code_block: - BinaryParser.SourceCodeBlock, value: bool): - self._source_files[source_code_block.source_file]["lines"][ - source_code_block.line_number]["covered"] = value + def set_line_coverage(self, source_file, line_number, value): + self._source_files[source_file]["lines"][ + line_number]["covered"] = value def set_function_coverage(self, function_block: Union[BinaryParser.FunctionBlock, @@ -655,6 +602,7 @@ class IntermediateCodeCoverage(object): elf_index = self.get_elf_index(elf_name) parser = BinaryParser(dump, self.workspace, self.remove_workspace, self.local_workspace) + src_lines_finder = get_src_lines_finder(elf_filename) total_number_functions = 0 functions_covered = 0 for function_block in parser.get_function_block(): @@ -662,22 +610,16 @@ class IntermediateCodeCoverage(object): # Function contains source code self.coverage.add_function_coverage(function_block) is_function_covered = False - for source_code_block in parser.get_source_code_block( - function_block): - self.coverage.add_line_coverage(source_code_block) - is_line_covered = False - for asm_line in parser.get_asm_line(source_code_block, - self.traces_stats): - # Here it is checked the line coverage - is_line_covered = asm_line.times_executed > 0 or \ - is_line_covered - self.coverage.add_asm_line(source_code_block, asm_line, - elf_index) - logger.debug(f"Source file {source_code_block} is " - f"{'' if is_line_covered else 'not '}covered") - if is_line_covered: - self.coverage.set_line_coverage(source_code_block, True) - is_function_covered = True + + for asm_line in parser.get_asm_line(function_block.dwarf, + self.traces_stats): + for src_file, line_no in src_lines_finder(asm_line.dec_address): + local_src_file = parser._relativized_path(src_file) + self.coverage.add_line_coverage(local_src_file, line_no) + self.coverage.add_asm_line(local_src_file, line_no, asm_line, elf_index) + if asm_line.times_executed > 0: + self.coverage.set_line_coverage(local_src_file, line_no, True) + is_function_covered = True logger.debug(f"\tFunction '{function_block.name}' at '" f"{function_block.source_file} is " f"{'' if is_function_covered else 'not '}covered") diff --git a/coverage-tool/coverage-reporting/src_lines_finder.py b/coverage-tool/coverage-reporting/src_lines_finder.py new file mode 100644 index 0000000000000000000000000000000000000000..e5c5d3636216974202e8535c95375ff30b78d4a8 --- /dev/null +++ b/coverage-tool/coverage-reporting/src_lines_finder.py @@ -0,0 +1,264 @@ +import sys +import os.path +from collections import defaultdict +from elftools.elf.elffile import ELFFile +from elftools.common.utils import bytes2str + + +def _store(lookup, file_path, line_no, start, end): + """ + For each even address in [start, end), add the pair + (file_path, line_no) to its set of source lines. + + >>> lookup = defaultdict(set) + >>> _store(lookup, 'foo.c', 1234, 42, 50) + >>> _store(lookup, 'bar.c', 23, 48, 52) + >>> for address, src_lines in lookup.items(): + ... print(address, sorted(src_lines)) + 42 [('foo.c', 1234)] + 44 [('foo.c', 1234)] + 46 [('foo.c', 1234)] + 48 [('bar.c', 23), ('foo.c', 1234)] + 50 [('bar.c', 23)] + + """ + for address in range(start, end, 2): + lookup[address].add((file_path, line_no)) + + +def _coalesce(line_entries): + """ + Generate (src file path, line no, start address, end address) + for each entry. + + >>> list(_coalesce([])) + [] + + The end address of an entry is the addess of the next entry, + provided it's different. + + >>> list(_coalesce([ + ... (1000, ('main.c', 23)), + ... (1004, ('main.c', 42)), + ... (1008, None) + ... ])) + [('main.c', 23, 1000, 1004), ('main.c', 42, 1004, 1008)] + + If there are consecutive entries with the same address, each + of them ends at the next different address. + + >>> for entry in list(_coalesce([ + ... (1000, ('main.c', 23)), + ... (1000, ('foo.h', 69)), + ... (1000, ('bar.h', 123)), + ... (1004, ('main.c', 42)), + ... (1008, None)])): + ... print(entry) + ('main.c', 23, 1000, 1004) + ('foo.h', 69, 1000, 1004) + ('bar.h', 123, 1000, 1004) + ('main.c', 42, 1004, 1008) + + We occasionally see a zero length final entry. Ignore. + + >>> list(_coalesce([ + ... (1000, ('foo', 23)), + ... (1000, None)])) + [] + + A nontrivial sequence must be terminated. + + >>> list(_coalesce([ + ... (1000, ('main.c', 23)), + ... (1004, ('main.c', 42)), + ... ])) + Traceback (most recent call last): + ... + RuntimeError: line program not terminated + + Records for line zero are not reported (compiler was unable + to assign an accurate line number), but don't terminate the + accumulation of entries. + + >>> for entry in list(_coalesce([ + ... (1000, ('main.c', 23)), + ... (1000, ('foo.h', 0)), + ... (1000, ('bar.h', 123)), + ... (1004, ('main.c', 42)), + ... (1008, None)])): + ... print(entry) + ('main.c', 23, 1000, 1004) + ('bar.h', 123, 1000, 1004) + ('main.c', 42, 1004, 1008) + + """ + pending = [] + pending_addr = None + + for addr, data in line_entries: + if addr != pending_addr: + # New address - flush all pending lines + for f, l in pending: + if l: # skip line == zero (unknown line) + yield (f, l, pending_addr, addr) + pending.clear() + + if data: + pending.append(data) + pending_addr = addr + else: + # End marker + pending_addr = None + pending.clear() + + if pending: + raise RuntimeError('line program not terminated') + + +def _full_path(file_index, dir_table, file_table, cu_dir=''): + """ + Find the full path for file_index by looking in the pyelftools + structures for Directory Table and File Name Table. + + >>> from collections import namedtuple + >>> FileEntry = namedtuple('FileEntry', ['dir_index', 'name']) + >>> dir_table = [ + ... b'/src', + ... b'/libbar/../libbar/include', + ... b'/libbaz/./include', + ... b'/libfoo/include', + ... b'./libfoo/include', + ... b'../libbar/include', + ... ] + >>> file_table = [ + ... FileEntry(1, b'main.c'), + ... FileEntry(4, b'foo.h'), + ... FileEntry(2, b'bar.h'), + ... FileEntry(3, b'baz.h'), + ... FileEntry(6, b'qux.h'), + ... FileEntry(5, b'foo.h'), + ... FileEntry(0, b'baz.h'), + ... ] + + Indices in both tables are 1-based. + + >>> _full_path(1, dir_table, file_table) + '/src/main.c' + + >>> _full_path(2, dir_table, file_table) + '/libfoo/include/foo.h' + + >>> _full_path(0, dir_table, file_table) + Traceback (most recent call last): + ... + RuntimeError: file index out of bounds + + >>> _full_path(8, dir_table, file_table) + Traceback (most recent call last): + ... + RuntimeError: file index out of bounds + + Paths with redundant cruft get normalized: + + >>> _full_path(3, dir_table, file_table) + '/libbar/include/bar.h' + + >>> _full_path(4, dir_table, file_table) + '/libbaz/include/baz.h' + + Paths might be relative to the compilation unit's + base directory. + + >>> _full_path(6, dir_table, file_table, '/lib') + '/lib/libfoo/include/foo.h' + + >>> _full_path(5, dir_table, file_table, 'src/lib') + 'src/libbar/include/qux.h' + + Ignore the cu_dir if the directory table gives us + an absolute path. + + >>> _full_path(3, dir_table, file_table, '/src/lib') + '/libbar/include/bar.h' + + Tolerate dir index 0, and treat it as CU dir. + >>> _full_path(7, dir_table, file_table, '/src/lib') + '/src/lib/baz.h' + + """ + if not 0 < file_index <= len(file_table): + raise RuntimeError('file index out of bounds') + file_entry = file_table[file_index - 1] # 1-based + filename = bytes2str(file_entry.name) + if file_entry.dir_index == 0: + directory = '' + elif 0 < file_entry.dir_index <= len(dir_table): + directory = bytes2str(dir_table[file_entry.dir_index - 1]) # 1-based + else: + raise RuntimeError('directory index out of bounds') + return os.path.normpath(os.path.join(cu_dir, directory, filename)) + + +def _gen_line_entries(line_program, cu_dir): + """ + Walk the pyelftools structure representing the line program for + one compilation unit. Yield (address, (src file path, line number)) + for normal entries, (address, None) for the end of sequence marker. + """ + for entry in line_program.get_entries(): + state = entry.state + if not state: + continue + if state.end_sequence: + yield (state.address, None) + else: + path = sys.intern(_full_path( + state.file, + line_program['include_directory'], + line_program['file_entry'], + cu_dir)) + yield (state.address, (path, state.line)) + + +def _get_comp_dir(cu): + """ + Get the compilation directory of a compilation unit. + """ + try: + top = cu.get_top_DIE() + return bytes2str(top.attributes['DW_AT_comp_dir'].value) + except: + return '' # safe default + + +def _make_lookup_table(file): + """ + Walk the pyelftools DWARF structures and build a lookup table + { address : set of source lines }. A source line is represented + as (src file path, line number). + """ + lookup = defaultdict(set) + elf = ELFFile(file) + if elf.has_dwarf_info(): + dwarfinfo = elf.get_dwarf_info() + # walk the compilation units + for cu in dwarfinfo.iter_CUs(): + # then the line program for each CU + line_program = dwarfinfo.line_program_for_CU(cu) + if line_program: + for file, line_no, start, end in _coalesce( + _gen_line_entries(line_program, _get_comp_dir(cu))): + _store(lookup, file, line_no, start, end) + return lookup + + +def get_src_lines_finder(filename): + with open(filename, 'rb') as file: + lookup = _make_lookup_table(file) + return lambda address: lookup.get(address, set()) + + +if __name__ == '__main__': + """ 'python3 src_lines_finder.py' to run the tests. """ + import doctest + doctest.testmod()