diff --git a/tools/report-converter/codechecker_report_converter/analyzers/gcc/sarif/__init__.py b/tools/report-converter/codechecker_report_converter/analyzers/gcc/sarif/__init__.py new file mode 100644 index 0000000000..4259749345 --- /dev/null +++ b/tools/report-converter/codechecker_report_converter/analyzers/gcc/sarif/__init__.py @@ -0,0 +1,7 @@ +# ------------------------------------------------------------------------- +# +# Part of the CodeChecker project, under the Apache License v2.0 with +# LLVM Exceptions. See LICENSE for license information. +# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +# +# ------------------------------------------------------------------------- diff --git a/tools/report-converter/codechecker_report_converter/analyzers/gcc/sarif/analyzer_result.py b/tools/report-converter/codechecker_report_converter/analyzers/gcc/sarif/analyzer_result.py new file mode 100644 index 0000000000..0bc07f93ff --- /dev/null +++ b/tools/report-converter/codechecker_report_converter/analyzers/gcc/sarif/analyzer_result.py @@ -0,0 +1,36 @@ +# ------------------------------------------------------------------------- +# +# Part of the CodeChecker project, under the Apache License v2.0 with +# LLVM Exceptions. See LICENSE for license information. +# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +# +# ------------------------------------------------------------------------- + +import logging +from typing import Dict, List + +from codechecker_report_converter.report import Report +from codechecker_report_converter.report.parser import sarif + +from ..analyzer_result import AnalyzerResultBase + + +LOG = logging.getLogger('report-converter') + + +class AnalyzerResult(AnalyzerResultBase): + """ Transform analyzer result of the FB Infer. """ + + TOOL_NAME = 'gcc' + NAME = 'GNU Compiler Collection Static Analyzer' + URL = 'https://gcc.gnu.org/wiki/StaticAnalyzer' + + def __init__(self): + super(AnalyzerResult, self).__init__() + self.__infer_out_parent_dir = None + self._file_cache: Dict[str, File] = {} + + def get_reports(self, result_file_path: str) -> List[Report]: + """ Get reports from the given analyzer result file. """ + + return sarif.Parser().get_reports(result_file_path) diff --git a/tools/report-converter/codechecker_report_converter/cli.py b/tools/report-converter/codechecker_report_converter/cli.py index 236685d2fe..55761b360a 100755 --- a/tools/report-converter/codechecker_report_converter/cli.py +++ b/tools/report-converter/codechecker_report_converter/cli.py @@ -73,7 +73,7 @@ class RawDescriptionDefaultHelpFormatter( analyzer_result = getattr(module, "AnalyzerResult") supported_converters[analyzer_result.TOOL_NAME] = analyzer_result except ModuleNotFoundError: - pass + raise supported_metadata_keys = ["analyzer_command", "analyzer_version"] @@ -188,6 +188,9 @@ def __add_arguments_to_parser(parser): "Currently supported output types are: " + ', '.join(sorted(supported_converters)) + ".") + extensions_with_dot = \ + sorted([f".{ext}" for ext in SUPPORTED_ANALYZER_EXTENSIONS]) + parser.add_argument('-e', '--export', type=str, dest='export', @@ -196,8 +199,8 @@ def __add_arguments_to_parser(parser): default=plist.EXTENSION, help="Specify the export format of the converted " "reports. Currently supported export types " - "are: " + ', '.join(sorted( - SUPPORTED_ANALYZER_EXTENSIONS)) + ".") + "are: " + + ', '.join(extensions_with_dot) + ".") parser.add_argument('--meta', nargs='*', diff --git a/tools/report-converter/codechecker_report_converter/report/parser/base.py b/tools/report-converter/codechecker_report_converter/report/parser/base.py index fb1bf3bdbf..7a739066f5 100644 --- a/tools/report-converter/codechecker_report_converter/report/parser/base.py +++ b/tools/report-converter/codechecker_report_converter/report/parser/base.py @@ -10,10 +10,12 @@ """ import logging +import os from abc import ABCMeta, abstractmethod -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple +from codechecker_report_converter import __title__, __version__ from codechecker_report_converter.report import File, Report from codechecker_report_converter.report.checker_labels import CheckerLabels from codechecker_report_converter.report.hash import HashType @@ -44,6 +46,26 @@ def get_severity(self, checker_name: str) -> Optional[str]: return self._checker_labels.severity(checker_name) return None + def get_tool_info(self) -> Tuple[str, str]: + """ Get tool info. + + If this was called through CodeChecker, this function will return + CodeChecker information, otherwise this tool (report-converter) + information. + """ + data_files_dir_path = os.environ.get('CC_DATA_FILES_DIR') + if data_files_dir_path: + analyzer_version_file_path = os.path.join( + data_files_dir_path, 'config', 'analyzer_version.json') + if os.path.exists(analyzer_version_file_path): + data = self.__load_json(analyzer_version_file_path) + version = data.get('version') + if version: + return 'CodeChecker', f"{version['major']}." \ + f"{version['minor']}.{version['revision']}" + + return __title__, __version__ + @abstractmethod def get_reports( self, diff --git a/tools/report-converter/codechecker_report_converter/report/parser/plist.py b/tools/report-converter/codechecker_report_converter/report/parser/plist.py index f87700790f..9e6287fc3e 100644 --- a/tools/report-converter/codechecker_report_converter/report/parser/plist.py +++ b/tools/report-converter/codechecker_report_converter/report/parser/plist.py @@ -27,7 +27,6 @@ else: from mypy_extensions import TypedDict -from codechecker_report_converter import __title__, __version__ from codechecker_report_converter.report import BugPathEvent, \ BugPathPosition, File, get_or_create_file, MacroExpansion, Range, Report from codechecker_report_converter.report.hash import get_report_hash, HashType @@ -447,33 +446,13 @@ def __load_json(self, path: str): return ret - def __get_tool_info(self) -> Tuple[str, str]: - """ Get tool info. - - If this was called through CodeChecker, this function will return - CodeChecker information, otherwise this tool (report-converter) - information. - """ - data_files_dir_path = os.environ.get('CC_DATA_FILES_DIR') - if data_files_dir_path: - analyzer_version_file_path = os.path.join( - data_files_dir_path, 'config', 'analyzer_version.json') - if os.path.exists(analyzer_version_file_path): - data = self.__load_json(analyzer_version_file_path) - version = data.get('version') - if version: - return 'CodeChecker', f"{version['major']}." \ - f"{version['minor']}.{version['revision']}" - - return __title__, __version__ - def convert( self, reports: List[Report], analyzer_info: Optional[AnalyzerInfo] = None ): """ Converts the given reports. """ - tool_name, tool_version = self.__get_tool_info() + tool_name, tool_version = self.get_tool_info() data: Dict[str, Any] = { 'files': [], diff --git a/tools/report-converter/codechecker_report_converter/report/parser/sarif.py b/tools/report-converter/codechecker_report_converter/report/parser/sarif.py new file mode 100644 index 0000000000..8872b07b99 --- /dev/null +++ b/tools/report-converter/codechecker_report_converter/report/parser/sarif.py @@ -0,0 +1,368 @@ +# ------------------------------------------------------------------------- +# +# Part of the CodeChecker project, under the Apache License v2.0 with +# LLVM Exceptions. See LICENSE for license information. +# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +# +# ------------------------------------------------------------------------- + +import json +import logging +import os +import sys + +from sarif import loader + +from typing import Dict, List, Optional + +from urllib.parse import urlparse +from typing import Any, Dict, List, NamedTuple, Optional, Tuple + +from codechecker_report_converter.report import BugPathEvent, \ + BugPathPosition, File, get_or_create_file, Range, Report +from codechecker_report_converter.report.hash import get_report_hash, HashType +from codechecker_report_converter.report.parser.base import AnalyzerInfo, \ + BaseParser + +EXTENSION = 'sarif' + +LOG = logging.getLogger('report-converter') + + +# $3.37 +class ThreadFlowInfo: + def __init__(self): + self.bug_path_events: List[BugPathEvent] = [] + self.bug_path_positions: List[BugPathPosition] = [] + self.notes: List[BugPathEvent] = [] + self.macro_expansions: List[BugPathEvent] = [] + + +class Parser(BaseParser): + def get_reports(self, result_file_path: str) -> List[Report]: + """ Get reports from the given analyzer result file. """ + + reports: List[Report] = [] + + data = loader.load_sarif_file(result_file_path) + + for run in data.runs: + rules = self._get_rules(run.run_data) + # $3.14.14 + if "originalUriBaseIds" in run.run_data: + self.original_uri_base_ids = run.run_data["originalUriBaseIds"] + + for result in run.get_results(): + rule_id = result["ruleId"] + message = self._process_message( + result["message"], rule_id, rules) # §3.11 + + # severity = self.get_severity(rule_id) + + thread_flow_info = self._process_code_flows( + result, rule_id, rules) + for location in result.get("locations", []): + # TODO: We don't really support non-local analyses, so we + # only parse physical locations here. + file, rng = self._process_physical_location(location) + if not (file and rng): + continue + + bug_path_events = thread_flow_info.bug_path_events or None + + report = Report( + file, rng.start_line, rng.start_col, + message, rule_id, # severity, + analyzer_result_file_path=result_file_path, + bug_path_events=bug_path_events, + bug_path_positions=thread_flow_info.bug_path_positions, + notes=thread_flow_info.notes, + macro_expansions=thread_flow_info.macro_expansions) + + if report.report_hash is None: + report.report_hash = get_report_hash( + report, HashType.PATH_SENSITIVE) + + reports.append(report) + + return reports + + def _get_rules(self, data: Dict) -> Dict[str, Dict]: + """ """ + rules: Dict[str, Dict] = {} + + driver = data["tool"]["driver"] + for rule in driver.get("rules", []): + rules[rule["id"]] = rule + + return rules + + def _process_code_flows( + self, + result: Dict, + rule_id: str, + rules: Dict[str, Dict] + ) -> Tuple[List[BugPathEvent], List[BugPathEvent]]: + """ """ + + thread_flow_info = ThreadFlowInfo() + + # TODO: Currently, we only collect bug path events. + + for code_flow in result.get("codeFlows", []): + for thread_flow in code_flow.get("threadFlows", []): # §3.36.3 + for location_data in thread_flow["locations"]: + # There are a lot data stored alongside the location worth + # parsing, but we only need the actual location now. + location = location_data["location"] + + if "message" not in location: + # TODO: this is a bug path position + continue + + message = self._process_message( + location["message"], rule_id, rules) + + file, rng = self._process_physical_location(location) + if not (file and rng): + continue + + importance = location.get("importance") + if importance == 'important': + pass + elif importance == 'essential': + pass + else: + pass + + # TODO: check the importance field. + thread_flow_info.bug_path_events.append(BugPathEvent( + message, file, rng.start_line, rng.start_col, rng)) + + return thread_flow_info + + def _process_physical_location( + self, + location: Dict, + ) -> Tuple[Optional[File], Optional[Range]]: + """ """ + physical_loc = location.get("physicalLocation") + if physical_loc: + file = self._get_file(physical_loc) + rng = self._get_range(physical_loc) + return file, rng + + return None, None + + def _get_range(self, physical_loc: Dict) -> Optional[Range]: + """ Get range from a physical location. """ + region = physical_loc.get("region", {}) + start_line = region.get("startLine") + if start_line is None: + return None + + start_col = region.get("startColumn", 1) + end_line = region.get("endLine", start_line) + end_col = region.get("endColumn", start_col) + + return Range(start_line, start_col, end_line, end_col) + + def _resolve_uri_base_id(self, uri_base_id : str): + original = self.original_uri_base_ids.get(uri_base_id) + full_uri = original.get("uri") + if "uri_base_id" in original: + return self._resolve_uri_base_id(original.get("uri_base_id")) \ + + full_uri + return full_uri + + def _get_file( + self, + physical_loc: Dict + ) -> Optional[File]: + """ Get file path. """ + artifact_loc = physical_loc.get("artifactLocation") + if not artifact_loc: + return None + + uri = artifact_loc.get("uri") + + if "uriBaseId" in artifact_loc: + uri = self._resolve_uri_base_id(artifact_loc.get("uriBaseId")) \ + + uri + + uri_parsed = urlparse(uri) + if uri_parsed is None: + LOG.warning(f"Failed to urlparse {uri}!") + return None + + + file_path = os.path.join(uri_parsed.netloc, uri_parsed.path) + + return get_or_create_file(file_path, self._file_cache) + + def _process_message( + self, + msg: Dict, + rule_id: str, + rules: Dict[str, Dict] + ) -> str: + """ Get message string. """ + if "text" in msg: + return msg["text"] + + args = msg.get("arguments", []) + + rule = rules[rule_id] + message_strings = rule.get("messageStrings", {}) + return message_strings[msg["id"]]["text"].format(*args) + + def convert( + self, + reports: List[Report], + analyzer_info: Optional[AnalyzerInfo] = None + ): + """ Converts the given reports to sarif format. """ + tool_name, tool_version = self.get_tool_info() + + rules = {} + results = [] + for report in reports: + if report.checker_name not in rules: + rules[report.checker_name] = { + "id": report.checker_name, + "fullDescription": { + "text": report.message + } + } + + results.append(self._create_result(report)) + + return { + "vesion": "2.1.0", + "$schema": "https://raw.githubusercontent.com/oasis-tcs/sarif-spec/master/Schemata/sarif-schema-2.1.0.json", + "runs": [{ + "tool": { + "driver": { + "name": tool_name, + "version": tool_version, + "rules": list(rules.values()) + } + }, + "results": results + }] + } + + def _create_result(self, report: Report) -> Dict: + """ Create result dictionary from the given report. """ + result = { + "ruleId": report.checker_name, + "message": { + "text": report.message + }, + "locations": [{ + "physicalLocation": { + "artifactLocation": { + "uri": f"file://{report.file.original_path}" + }, + "region": { + "startLine": report.line, + "startColumn": report.column + } + } + }] + } + + locations = [] + + if report.bug_path_events: + for event in report.bug_path_events: + locations.append(self._create_location_from_bug_path_event( + event, "important")) + + if report.notes: + for note in report.notes: + locations.append(self._create_location_from_bug_path_event( + note, "essential")) + + if report.macro_expansions: + for macro_expansion in report.macro_expansion: + locations.append(self._create_location_from_bug_path_event( + macro_expansion, "essential")) + + if report.bug_path_positions: + for bug_path_position in report.bug_path_positions: + locations.append(self._create_location(bug_path_position)) + + if locations: + result["codeFlows"] = [{ + "threadFlows": [{"locations": locations}] + }] + + return result + + def _create_location_from_bug_path_event( + self, + event: BugPathEvent, + importance: str + ) -> Dict[str, Any]: + """ Create location from bug path event. """ + location = self._create_location(event, event.line, event.column) + + location["importance"] = importance + location["location"]["message"] = {"text": event.message} + + return location + + def _create_location( + self, + pos: BugPathPosition, + line: Optional[int] = -1, + column: Optional[int] = -1 + ) -> Dict[str, Any]: + """ Create location from bug path position. """ + if pos.range: + rng = pos.range + region = { + "startLine": rng.start_line, + "startColumn": rng.start_col, + "endLine": rng.end_line, + "endColumn": rng.end_col, + } + else: + region = { + "startLine": line, + "startColumn": column, + } + + return { + "location": { + "physicalLocation": { + "artifactLocation": { + "uri": f"file://{pos.file.original_path}" + }, + "region": region + } + } + } + + def write(self, data: Any, output_file_path: str): + """ Creates an analyzer output file from the given data. """ + try: + with open(output_file_path, 'w', + encoding="utf-8", errors="ignore") as f: + json.dump(data, f) + except TypeError as err: + LOG.error('Failed to write sarif file: %s', output_file_path) + LOG.error(err) + import traceback + traceback.print_exc() + + def replace_report_hash( + self, + analyzer_result_file_path: str, + hash_type=HashType.CONTEXT_FREE + ): + """ + Override hash in the given file by using the given version hash. + """ + pass diff --git a/tools/report-converter/codechecker_report_converter/report/report_file.py b/tools/report-converter/codechecker_report_converter/report/report_file.py index 3bf6457bb0..d23394b4ce 100644 --- a/tools/report-converter/codechecker_report_converter/report/report_file.py +++ b/tools/report-converter/codechecker_report_converter/report/report_file.py @@ -14,23 +14,23 @@ from codechecker_report_converter.report import File, Report from codechecker_report_converter.report.checker_labels import CheckerLabels from codechecker_report_converter.report.hash import HashType -from codechecker_report_converter.report.parser import plist +from codechecker_report_converter.report.parser import plist, sarif from codechecker_report_converter.report.parser.base import AnalyzerInfo LOG = logging.getLogger('report-converter') -SUPPORTED_ANALYZER_EXTENSIONS = [plist.EXTENSION] +SUPPORTED_ANALYZER_FORMATS = [plist, sarif] -__SUPPORTED_ANALYZER_EXTENSIONS = tuple([ - f".{extension}" for extension in SUPPORTED_ANALYZER_EXTENSIONS]) +SUPPORTED_ANALYZER_EXTENSIONS = tuple([ + _format.EXTENSION for _format in SUPPORTED_ANALYZER_FORMATS]) def is_supported(analyzer_result_file_path: str) -> bool: """ True if the given report file can be parsed. """ - return analyzer_result_file_path.endswith(__SUPPORTED_ANALYZER_EXTENSIONS) + return analyzer_result_file_path.endswith(SUPPORTED_ANALYZER_EXTENSIONS) def get_parser( @@ -39,8 +39,9 @@ def get_parser( file_cache: Optional[Dict[str, File]] = None ): """ Returns a parser object for the given analyzer result file. """ - if analyzer_result_file_path.endswith(f".{plist.EXTENSION}"): - return plist.Parser(checker_labels, file_cache) + for _format in SUPPORTED_ANALYZER_FORMATS: + if analyzer_result_file_path.endswith(_format.EXTENSION): + return _format.Parser(checker_labels, file_cache) def get_reports( diff --git a/tools/report-converter/requirements_py/dev/requirements.txt b/tools/report-converter/requirements_py/dev/requirements.txt index bd20124f6c..50981f0c0f 100644 --- a/tools/report-converter/requirements_py/dev/requirements.txt +++ b/tools/report-converter/requirements_py/dev/requirements.txt @@ -1,4 +1,5 @@ pytest==7.3.1 +sarif-tools==1.0.0 pycodestyle==2.7.0 pylint==2.8.2 portalocker==2.2.1