diff --git a/codechecker_common/util.py b/codechecker_common/util.py index a9e966393d..5827a99b10 100644 --- a/codechecker_common/util.py +++ b/codechecker_common/util.py @@ -13,8 +13,9 @@ import itertools import json import os +import pathlib import random -from typing import TextIO +from typing import TextIO, Union import portalocker @@ -55,7 +56,10 @@ def chunks(iterator, n): yield itertools.chain([first], rest_of_chunk) -def load_json(path: str, default=None, lock=False, display_warning=True): +def load_json(path: Union[str, pathlib.Path], + default=None, + lock=False, + display_warning=True): """ Load the contents of the given file as a JSON and return it's value, or default if the file can't be loaded. @@ -133,3 +137,16 @@ def generate_random_token(num_bytes: int = 32) -> str: for _ in range(0, -(num_bytes // -64))]) idx = random.randrange(0, len(hash_value) - num_bytes + 1) return hash_value[idx:(idx + num_bytes)] + + +def format_size(num: float, suffix: str = 'B') -> str: + """ + Pretty print storage units. + Source: http://stackoverflow.com/questions/1094841/ + reusable-library-to-get-human-readable-version-of-file-size + """ + for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi', 'Yi', 'Ri']: + if abs(num) < 1024.0: + return f"{num:3.1f} {unit}{suffix}" + num /= 1024.0 + return f"{num:.1f} Qi{suffix}" diff --git a/web/api/codechecker_api_shared.thrift b/web/api/codechecker_api_shared.thrift index 3e01ea5fbd..4a41889bbc 100644 --- a/web/api/codechecker_api_shared.thrift +++ b/web/api/codechecker_api_shared.thrift @@ -14,14 +14,35 @@ enum Ternary { } enum ErrorCode { - DATABASE, - IOERROR, - GENERAL, - AUTH_DENIED, // Authentication denied. We do not allow access to the service. - UNAUTHORIZED, // Authorization denied. User does not have right to perform an action. - API_MISMATCH, // The client attempted to query an API version that is not supported by the server. - SOURCE_FILE, // The client sent a source code which contains errors (e.g., source code comment errors). - REPORT_FORMAT, // The client sent a report with wrong format (e.g., report annotation has bad type in a .plist). + // Any other sort of error encountered during RPC execution. + GENERAL = 2, + + // Executing the request triggered a database-level fault, constraint violation. + DATABASE = 0, + + // The request is malformed or an internal I/O operation failed. + IOERROR = 1, + + // Authentication denied. We do not allow access to the service. + AUTH_DENIED = 3, + + // User does not have the necessary rights to perform an action. + UNAUTHORIZED = 4, + + // The client attempted to query an API version that is not supported by the + // server. + API_MISMATCH = 5, + + // REMOVED IN API v6.59 (CodeChecker v6.25.0)! + // Previously sent by report_server.thrif/codeCheckerDBAccess::massStoreRun() + // when the client uploaded a source file which contained errors, such as + // review status source-code-comment errors. + /* SOURCE_FILE = 6, */ // Never reuse the value of the enum constant! + + // REMOVED IN API v6.59 (CodeChecker v6.25.0)! + // Previously sent by report_server.thrif/codeCheckerDBAccess::massStoreRun() + // when the client uploaded a report with annotations that had invalid types. + /* REPORT_FORMAT = 7, */ // Never reuse the value of the enum constant! } exception RequestFailed { diff --git a/web/api/js/codechecker-api-node/dist/codechecker-api-6.59.0.tgz b/web/api/js/codechecker-api-node/dist/codechecker-api-6.59.0.tgz index 270aaf28fc..3bfe9dd373 100644 Binary files a/web/api/js/codechecker-api-node/dist/codechecker-api-6.59.0.tgz and b/web/api/js/codechecker-api-node/dist/codechecker-api-6.59.0.tgz differ diff --git a/web/api/py/codechecker_api/dist/codechecker_api.tar.gz b/web/api/py/codechecker_api/dist/codechecker_api.tar.gz index 601a9c2e5c..0be636d9d6 100644 Binary files a/web/api/py/codechecker_api/dist/codechecker_api.tar.gz and b/web/api/py/codechecker_api/dist/codechecker_api.tar.gz differ diff --git a/web/api/py/codechecker_api_shared/dist/codechecker_api_shared.tar.gz b/web/api/py/codechecker_api_shared/dist/codechecker_api_shared.tar.gz index 1cbe790554..6a8bc206aa 100644 Binary files a/web/api/py/codechecker_api_shared/dist/codechecker_api_shared.tar.gz and b/web/api/py/codechecker_api_shared/dist/codechecker_api_shared.tar.gz differ diff --git a/web/client/codechecker_client/cmd/store.py b/web/client/codechecker_client/cmd/store.py index 58e7f307a9..6889b6bdd1 100644 --- a/web/client/codechecker_client/cmd/store.py +++ b/web/client/codechecker_client/cmd/store.py @@ -32,7 +32,6 @@ from typing import Dict, Iterable, List, Set, Tuple from codechecker_api.codeCheckerDBAccess_v6.ttypes import StoreLimitKind -from codechecker_api_shared.ttypes import RequestFailed, ErrorCode from codechecker_report_converter import twodim from codechecker_report_converter.report import Report, report_file, \ @@ -58,7 +57,7 @@ def assemble_blame_info(_, __) -> int: from codechecker_common.compatibility.multiprocessing import Pool from codechecker_common.source_code_comment_handler import \ SourceCodeCommentHandler -from codechecker_common.util import load_json +from codechecker_common.util import format_size, load_json from codechecker_web.shared import webserver_context, host_check from codechecker_web.shared.env import get_default_workspace @@ -66,7 +65,7 @@ def assemble_blame_info(_, __) -> int: LOG = logger.get_logger('system') -MAX_UPLOAD_SIZE = 1 * 1024 * 1024 * 1024 # 1GiB +MAX_UPLOAD_SIZE = 1024 ** 3 # 1024^3 = 1 GiB. AnalyzerResultFileReports = Dict[str, List[Report]] @@ -87,7 +86,7 @@ def assemble_blame_info(_, __) -> int: """Contains information about the report file after parsing. -store_it: True if every information is availabe and the +store_it: True if every information is available and the report can be stored main_report_positions: list of ReportLineInfo containing the main report positions @@ -135,19 +134,6 @@ def _write_summary(self, out=sys.stdout): out.write("\n----=================----\n") -def sizeof_fmt(num, suffix='B'): - """ - Pretty print storage units. - Source: https://stackoverflow.com/questions/1094841/ - reusable-library-to-get-human-readable-version-of-file-size - """ - for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']: - if abs(num) < 1024.0: - return f"{num:3.1f}{unit}{suffix}" - num /= 1024.0 - return f"{num:.1f}Yi{suffix}" - - def get_file_content_hash(file_path): """ Return the file content hash for a file. @@ -649,7 +635,7 @@ def assemble_zip(inputs, compressed_zip_size = os.stat(zip_file).st_size LOG.info("Compressing report zip file done (%s / %s).", - sizeof_fmt(zip_size), sizeof_fmt(compressed_zip_size)) + format_size(zip_size), format_size(compressed_zip_size)) # We are responsible for deleting these. shutil.rmtree(temp_dir) @@ -698,7 +684,7 @@ def get_analysis_statistics(inputs, limits): if os.stat(compilation_db).st_size > compilation_db_size: LOG.debug("Compilation database is too big (max: %s).", - sizeof_fmt(compilation_db_size)) + format_size(compilation_db_size)) else: LOG.debug("Copying file '%s' to analyzer statistics " "ZIP...", compilation_db) @@ -721,7 +707,7 @@ def get_analysis_statistics(inputs, limits): if failed_files_size > failure_zip_limit: LOG.debug("We reached the limit of maximum uploadable " "failure zip size (max: %s).", - sizeof_fmt(failure_zip_limit)) + format_size(failure_zip_limit)) break else: LOG.debug("Copying failure zip file '%s' to analyzer " @@ -932,7 +918,7 @@ def main(args): zip_size = os.stat(zip_file).st_size if zip_size > MAX_UPLOAD_SIZE: LOG.error("The result list to upload is too big (max: %s): %s.", - sizeof_fmt(MAX_UPLOAD_SIZE), sizeof_fmt(zip_size)) + format_size(MAX_UPLOAD_SIZE), format_size(zip_size)) sys.exit(1) b64zip = "" @@ -1005,15 +991,6 @@ def main(args): storing_analysis_statistics(client, args.input, args.name) LOG.info("Storage finished successfully.") - except RequestFailed as reqfail: - if reqfail.errorCode == ErrorCode.SOURCE_FILE: - header = ['File', 'Line', 'Checker name'] - table = twodim.to_str( - 'table', header, [c.split('|') for c in reqfail.extraInfo]) - LOG.warning("Setting the review statuses for some reports failed " - "because of non valid source code comments: " - "%s\n %s", reqfail.message, table) - sys.exit(1) except Exception as ex: import traceback traceback.print_exc() diff --git a/web/client/codechecker_client/thrift_call.py b/web/client/codechecker_client/thrift_call.py index 32e5b3dc18..d41f7d7187 100644 --- a/web/client/codechecker_client/thrift_call.py +++ b/web/client/codechecker_client/thrift_call.py @@ -81,7 +81,11 @@ def wrapper(self, *args, **kwargs): LOG.error( 'Client/server API mismatch\n %s', str(reqfailure.message)) else: - LOG.error('API call error: %s\n%s', func_name, str(reqfailure)) + LOG.error("Error during API call: %s", func_name) + LOG.debug("%s", str(reqfailure)) + LOG.error("%s", str(reqfailure.message)) + if reqfailure.extraInfo: + LOG.error("%s", '\n'.join(reqfailure.extraInfo)) sys.exit(1) except TApplicationException as ex: LOG.error("Internal server error: %s", str(ex.message)) diff --git a/web/server/codechecker_server/api/mass_store_run.py b/web/server/codechecker_server/api/mass_store_run.py index 87ab4e2a52..cc95d88f5f 100644 --- a/web/server/codechecker_server/api/mass_store_run.py +++ b/web/server/codechecker_server/api/mass_store_run.py @@ -5,31 +5,36 @@ # SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception # # ------------------------------------------------------------------------- +""" +Implementation of the ``massStoreRunAsynchronous()`` API function that store +run data to a product's report database. +Called via `report_server`, but factored out here for readability. +""" import base64 +from collections import defaultdict +from datetime import datetime, timedelta +from hashlib import sha256 import json import os +from pathlib import Path import sqlalchemy import tempfile import time +from typing import Any, Dict, List, Optional, Set, Tuple, Union, cast import zipfile import zlib -from collections import defaultdict -from datetime import datetime, timedelta -from hashlib import sha256 -from tempfile import TemporaryDirectory -from typing import Any, Dict, List, Optional, Set, Tuple, Union, cast - -import codechecker_api_shared +from codechecker_api_shared.ttypes import DBStatus, ErrorCode, RequestFailed from codechecker_api.codeCheckerDBAccess_v6 import ttypes from codechecker_common import skiplist_handler from codechecker_common.logger import get_logger from codechecker_common.review_status_handler import ReviewStatusHandler, \ SourceReviewStatus -from codechecker_common.util import load_json, path_for_fake_root +from codechecker_common.util import format_size, load_json, path_for_fake_root +from codechecker_report_converter import twodim from codechecker_report_converter.util import trim_path_prefixes from codechecker_report_converter.report import \ FakeChecker, Report, UnknownChecker, report_file @@ -45,10 +50,12 @@ ExtendedReportData, \ File, FileContent, \ Report as DBReport, ReportAnnotations, ReviewStatus as ReviewStatusRule, \ - Run, RunLock, RunHistory + Run, RunLock as DBRunLock, RunHistory from ..metadata import checker_is_unavailable, MetadataInfoParser - -from .report_server import ThriftRequestHandler +from ..product import Product as ServerProduct +from ..session_manager import SessionManager +from ..task_executors.abstract_task import AbstractTask +from ..task_executors.task_manager import TaskManager from .thrift_enum_helper import report_extended_data_type_str @@ -56,32 +63,37 @@ STORE_TIME_LOG = get_logger('store_time') -class LogTask: +class StepLog: + """ + Simple context manager that logs an arbitrary step's comment and time + taken annotated with a run name. + """ + def __init__(self, run_name: str, message: str): - self.__run_name = run_name - self.__msg = message - self.__start_time = time.time() + self._run_name = run_name + self._msg = message + self._start_time = time.time() - def __enter__(self, *args): - LOG.info("[%s] %s...", self.__run_name, self.__msg) + def __enter__(self, *_args): + LOG.info("[%s] %s...", self._run_name, self._msg) - def __exit__(self, *args): - LOG.info("[%s] %s. Done. (Duration: %s sec)", self.__run_name, - self.__msg, round(time.time() - self.__start_time, 2)) + def __exit__(self, *_args): + LOG.info("[%s] %s. Done. (Duration: %.2f sec)", + self._run_name, self._msg, time.time() - self._start_time) -class RunLocking: +class RunLock: def __init__(self, session: DBSession, run_name: str): self.__session = session self.__run_name = run_name self.__run_lock = None - def __enter__(self, *args): + def __enter__(self, *_args): # Load the lock record for "FOR UPDATE" so that the transaction that # handles the run's store operations has a lock on the database row # itself. - self.__run_lock = self.__session.query(RunLock) \ - .filter(RunLock.name == self.__run_name) \ + self.__run_lock = self.__session.query(DBRunLock) \ + .filter(DBRunLock.name == self.__run_name) \ .with_for_update(nowait=True) \ .one() @@ -98,29 +110,151 @@ def __enter__(self, *args): self.__run_name, self.__run_lock.locked_at) return self - def __exit__(self, *args): + def __exit__(self, *_args): self.__run_lock = None self.__session = None + def store_run_lock_in_db(self, associated_user: str): + """ + Stores a `DBRunLock` record for the given run name into the database. + """ + try: + # If the run can be stored, we need to lock it first. If there is + # already a lock in the database for the given run name which is + # expired and multiple processes are trying to get this entry from + # the database for update we may get the following exception: + # could not obtain lock on row in relation "run_locks" + # This is the reason why we have to wrap this query to a try/except + # block. + run_lock: Optional[DBRunLock] = self.__session.query(DBRunLock) \ + .filter(DBRunLock.name == self.__run_name) \ + .with_for_update(nowait=True) \ + .one_or_none() + except (sqlalchemy.exc.OperationalError, + sqlalchemy.exc.ProgrammingError) as ex: + LOG.error("Failed to get run lock for '%s': %s", + self.__run_name, ex) + raise RequestFailed( + ErrorCode.DATABASE, + "Someone is already storing to the same run. Please wait " + "while the other storage is finished and try it again.") \ + from ex -def unzip(b64zip: str, output_dir: str) -> int: + if not run_lock: + # If there is no lock record for the given run name, the run + # is not locked -> create a new lock. + self.__session.add(DBRunLock(self.__run_name, associated_user)) + LOG.debug("Acquiring 'run_lock' for '%s' on run '%s' ...", + associated_user, self.__run_name) + elif run_lock.has_expired( + db_cleanup.RUN_LOCK_TIMEOUT_IN_DATABASE): + # There can be a lock in the database, which has already + # expired. In this case, we assume that the previous operation + # has failed, and thus, we can re-use the already present lock. + run_lock.touch() + run_lock.username = associated_user + LOG.debug("Reusing existing, stale 'run_lock' record on " + "run '%s' ...", + self.__run_name) + else: + # In case the lock exists and it has not expired, we must + # consider the run a locked one. + when = run_lock.when_expires( + db_cleanup.RUN_LOCK_TIMEOUT_IN_DATABASE) + username = run_lock.username or "another user" + LOG.info("Refusing to store into run '%s' as it is locked by " + "%s. Lock will expire at '%s'.", + self.__run_name, username, when) + raise RequestFailed( + ErrorCode.DATABASE, + f"The run named '{self.__run_name}' is being stored into by " + f"{username}. If the other store operation has failed, this " + f"lock will expire at '{when}'.") + + # At any rate, if the lock has been created or updated, commit it + # into the database. + try: + self.__session.commit() + except (sqlalchemy.exc.IntegrityError, + sqlalchemy.orm.exc.StaleDataError) as ex: + # The commit of this lock can fail. + # + # In case two store ops attempt to lock the same run name at the + # same time, committing the lock in the transaction that commits + # later will result in an IntegrityError due to the primary key + # constraint. + # + # In case two store ops attempt to lock the same run name with + # reuse and one of the operation hangs long enough before COMMIT + # so that the other operation commits and thus removes the lock + # record, StaleDataError is raised. In this case, also consider + # the run locked, as the data changed while the transaction was + # waiting, as another run wholly completed. + + LOG.info("Run '%s' got locked while current transaction " + "tried to acquire a lock. Considering run as locked.", + self.__run_name) + raise RequestFailed( + ErrorCode.DATABASE, + f"The run named '{self.__run_name}' is being stored into by " + "another user.") from ex + + LOG.debug("Successfully acquired 'run_lock' for '%s' on run '%s'.", + associated_user, self.__run_name) + + def drop_run_lock_from_db(self): + """Remove the run_lock row from the database for the current run.""" + # Using with_for_update() here so the database (in case it supports + # this operation) locks the lock record's row from any other access. + LOG.debug("Releasing 'run_lock' from run '%s' ...") + run_lock: Optional[DBRunLock] = self.__session.query(DBRunLock) \ + .filter(DBRunLock.name == self.__run_name) \ + .with_for_update(nowait=True).one() + if not run_lock: + raise KeyError( + f"No 'run_lock' in database for run '{self.__run_name}'") + locked_at = run_lock.locked_at + username = run_lock.username + + self.__session.delete(run_lock) + self.__session.commit() + + LOG.debug("Released 'run_lock' (originally acquired by '%s' on '%s') " + "from run '%s'.", + username, str(locked_at), self.__run_name) + + +def unzip(run_name: str, b64zip: str, output_dir: Path) -> int: """ - This function unzips the base64 encoded zip file. This zip is extracted - to a temporary directory and the ZIP is then deleted. The function returns - the size of the extracted decompressed zip file. + This function unzips a Base64 encoded and ZLib-compressed ZIP file. + This ZIP is extracted to a temporary directory and the ZIP is then deleted. + The function returns the size of the extracted decompressed ZIP file. """ - if len(b64zip) == 0: + if not b64zip: return 0 - with tempfile.NamedTemporaryFile(suffix='.zip') as zip_file: - LOG.debug("Unzipping mass storage ZIP '%s' to '%s'...", - zip_file.name, output_dir) - + with tempfile.NamedTemporaryFile( + suffix=".zip", dir=output_dir) as zip_file: + LOG.debug("Decompressing input massStoreRun() ZIP to '%s' ...", + zip_file.name) + start_time = time.time() zip_file.write(zlib.decompress(base64.b64decode(b64zip))) - with zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zipf: + zip_file.flush() + end_time = time.time() + + size = os.stat(zip_file.name).st_size + LOG.debug("Decompressed input massStoreRun() ZIP '%s' -> '%s' " + "(compression ratio: %.2f%%) in '%s'.", + format_size(len(b64zip)), format_size(size), + (size / len(b64zip)), + timedelta(seconds=end_time - start_time)) + + with StepLog(run_name, "Extract massStoreRun() ZIP contents"), \ + zipfile.ZipFile(zip_file, 'r', allowZip64=True) as zip_handle: + LOG.debug("Extracting massStoreRun() ZIP '%s' to '%s' ...", + zip_file.name, output_dir) try: - zipf.extractall(output_dir) - return os.stat(zip_file.name).st_size + zip_handle.extractall(output_dir) except Exception: LOG.error("Failed to extract received ZIP.") import traceback @@ -128,9 +262,11 @@ def unzip(b64zip: str, output_dir: str) -> int: raise return 0 + return size + def get_file_content(file_path: str) -> bytes: - """Return the file content for the given filepath. """ + """Return the file content for the given `file_path`.""" with open(file_path, 'rb') as f: return f.read() @@ -202,7 +338,7 @@ def add_file_record( def get_blame_file_data( - blame_file: str + blame_file: Path ) -> Tuple[Optional[str], Optional[str], Optional[str]]: """ Get blame information from the given file. @@ -214,7 +350,7 @@ def get_blame_file_data( remote_url = None tracking_branch = None - if os.path.isfile(blame_file): + if blame_file.is_file(): data = load_json(blame_file) if data: remote_url = data.get("remote_url") @@ -234,199 +370,303 @@ def checker_name_for_report(report: Report) -> Tuple[str, str]: report.checker_name or UnknownChecker[1]) -class MassStoreRun: - def __init__( - self, - report_server: ThriftRequestHandler, - name: str, - tag: Optional[str], - version: Optional[str], - b64zip: str, - force: bool, - trim_path_prefix_list: Optional[List[str]], - description: Optional[str] - ): - """ Initialize object. """ - self.__report_server = report_server +class MassStoreRunInputHandler: + """Prepares a `MassStoreRunTask` from an API input.""" + + # Note: The implementation of this class is executed in the "foreground", + # in the context of an API handler process! + # **DO NOT** put complex logic here that would take too much time to + # validate. + # Long-running actions of a storage process should be in + # MassStoreRunImplementation instead! + + def __init__(self, + session_manager: SessionManager, + config_db_sessionmaker, + product_db_sessionmaker, + task_manager: TaskManager, + package_context, + product_id: int, + run_name: str, + run_description: Optional[str], + store_tag: Optional[str], + client_version: str, + force_overwrite_of_run: bool, + path_prefixes_to_trim: Optional[List[str]], + zipfile_contents_base64: str, + user_name: str): + self._input_handling_start_time = time.time() + self._session_manager = session_manager + self._config_db = config_db_sessionmaker + self._product_db = product_db_sessionmaker + self._tm = task_manager + self._package_context = package_context + self._input_zip_blob = zipfile_contents_base64 + self.client_version = client_version + self.force_overwrite_of_run = force_overwrite_of_run + self.path_prefixes_to_trim = path_prefixes_to_trim + self.run_name = run_name + self.run_description = run_description + self.store_tag = store_tag + self.user_name = user_name + + with DBSession(self._config_db) as session: + product: Optional[Product] = session.query(Product) \ + .get(product_id) + if not product: + raise KeyError(f"No product with ID '{product_id}'") + + self._product = product + + def check_store_input_validity_at_face_value(self): + """ + Performs semantic checks of a ``massStoreRunAsynchronous()`` Thrift + call that can be done with trivial amounts of work (i.e., without + actually parsing the full input ZIP). + """ + self._check_run_limit() + self._store_run_lock() # Fails if the run can not be stored into. - self.__name = name - self.__tag = tag - self.__version = version - self.__b64zip = b64zip - self.__force = force - self.__trim_path_prefixes = trim_path_prefix_list - self.__description = description + def create_mass_store_task(self, + is_actually_asynchronous=False) \ + -> "MassStoreRunTask": + """ + Constructs the `MassStoreRunTask` for the handled and verified input. - self.__mips: Dict[str, MetadataInfoParser] = {} - self.__analysis_info: Dict[str, AnalysisInfo] = {} - self.__checker_row_cache: Dict[Tuple[str, str], Checker] = {} - self.__duration: int = 0 - self.__report_count: int = 0 - self.__report_limit: int = 0 - self.__wrong_src_code_comments: List[str] = [] - self.__already_added_report_hashes: Set[str] = set() - self.__new_report_hashes: Dict[str, Tuple] = {} - self.__all_report_checkers: Set[str] = set() - self.__added_reports: List[Tuple[DBReport, Report]] = [] - self.__reports_with_fake_checkers: Dict[ - # Either a DBReport *without* an ID, or the ID of a committed - # DBReport. - str, Tuple[Report, Union[DBReport, int]]] = {} + Calling this function results in observable changes outside the + process's memory, as it records the task into the database and + extracts things to the server's storage area. + """ + token = self._tm.allocate_task_record( + "report_server::massStoreRunAsynchronous()" + if is_actually_asynchronous + else "report_server::massStoreRun()", + ("Legacy s" if not is_actually_asynchronous else "S") + + f"tore of results to '{self._product.endpoint}' - " + f"'{self.run_name}'", + self.user_name, + self._product) + temp_dir = self._tm.create_task_data(token) + extract_dir = temp_dir / "store_zip" + os.makedirs(extract_dir, exist_ok=True) - self.__get_report_limit_for_product() + try: + with StepLog(self.run_name, + "Save massStoreRun() ZIP data to server storage"): + zip_size = unzip(self.run_name, + self._input_zip_blob, + extract_dir) + + if not zip_size: + raise RequestFailed(ErrorCode.GENERAL, + "The uploaded ZIP file is empty!") + except Exception: + LOG.error("Failed to extract massStoreRunAsynchronous() ZIP!") + import traceback + traceback.print_exc() + raise - @property - def __manager(self): - return self.__report_server._manager + self._input_handling_end_time = time.time() - @property - def __config_database(self): - return self.__report_server._config_database + try: + with open(temp_dir / "store_configuration.json", 'w', + encoding="utf-8") as cfg_f: + json.dump({ + "client_version": self.client_version, + "force_overwrite": self.force_overwrite_of_run, + "path_prefixes_to_trim": self.path_prefixes_to_trim, + "run_name": self.run_name, + "run_description": self.run_description, + "store_tag": self.store_tag, + "user_name": self.user_name, + }, cfg_f) + except Exception: + LOG.error("Failed to write massStoreRunAsynchronous() " + "configuration!") + import traceback + traceback.print_exc() + raise - @property - def __product(self): - return self.__report_server._product + task = MassStoreRunTask(token, temp_dir, + self._package_context, + self._product.id, + zip_size, + self._input_handling_end_time - + self._input_handling_start_time) - @property - def __context(self): - return self.__report_server._context + if not is_actually_asynchronous: + self._tm.add_comment( + task, + "WARNING!\nExecuting a legacy 'massStoreRun()' API call!", + "SYSTEM") - @property - def user_name(self): - return self.__report_server._get_username() + return task - def __check_run_limit(self): + def _check_run_limit(self): """ - Checks the maximum allowed of uploadable runs for the current product. + Checks the maximum allowed number of uploadable runs for the current + product. """ - max_run_count = self.__manager.get_max_run_count() - - with DBSession(self.__config_database) as session: - product = session.query(Product).get(self.__product.id) - if product.run_limit: - max_run_count = product.run_limit - - # Session that handles constraints on the run. - with DBSession(self.__report_server._Session) as session: - if not max_run_count: - return - - LOG.debug("Check the maximum number of allowed runs which is %d", - max_run_count) - - run = session.query(Run) \ - .filter(Run.name == self.__name) \ + run_limit: Optional[int] = self._session_manager.get_max_run_count() + if self._product.run_limit: + run_limit = self._product.run_limit + + if not run_limit: + # Allowing the user to upload an unlimited number of runs. + return + LOG.debug("Checking the maximum number of allowed runs in '%s', " + "which is %d.", + self._product.endpoint, run_limit) + + with DBSession(self._product_db) as session: + existing_run: Optional[Run] = session.query(Run) \ + .filter(Run.name == self.run_name) \ .one_or_none() - - # If max_run_count is not set in the config file, it will allow - # the user to upload unlimited runs. - run_count = session.query(Run.id).count() - # If we are not updating a run or the run count is reached the - # limit it will throw an exception. - if not run and run_count >= max_run_count: - remove_run_count = run_count - max_run_count + 1 - raise codechecker_api_shared.ttypes.RequestFailed( - codechecker_api_shared.ttypes.ErrorCode.GENERAL, - f"You reached the maximum number of allowed runs " - f"({run_count}/{max_run_count})! Please remove at least " - f"{remove_run_count} run(s) before you try it again.") - - def __store_run_lock(self, session: DBSession): + if not existing_run and run_count >= run_limit: + raise RequestFailed( + ErrorCode.GENERAL, + "You reached the maximum number of allowed runs " + f"({run_count}/{run_limit})! " + f"Please remove at least {run_count - run_limit + 1} " + "run(s) before you try again!") + + def _store_run_lock(self): + """Commits a `DBRunLock` for the to-be-stored `Run`, if available.""" + with DBSession(self._product_db) as session: + RunLock(session, self.run_name) \ + .store_run_lock_in_db(self.user_name) + + +class MassStoreRunTask(AbstractTask): + """Executes `MassStoreRun` as a background job.""" + + def __init__(self, token: str, data_path: Path, + package_context, + product_id: int, + input_zip_size: int, + preparation_time_elapsed: float): """ - Store a RunLock record for the given run name into the database. + Creates the `AbstractTask` implementation for + ``massStoreRunAsynchronous()``. + + `preparation_time_elapsed` records how much time was spent by the + input handling that prepared the task. + This time will be added to the total time spent processing the results + in the background. + (The time spent in waiting between task enschedulement and task + execution is not part of the total time.) """ - try: - # If the run can be stored, we need to lock it first. If there is - # already a lock in the database for the given run name which is - # expired and multiple processes are trying to get this entry from - # the database for update we may get the following exception: - # could not obtain lock on row in relation "run_locks" - # This is the reason why we have to wrap this query to a try/except - # block. - run_lock = session.query(RunLock) \ - .filter(RunLock.name == self.__name) \ - .with_for_update(nowait=True).one_or_none() - except (sqlalchemy.exc.OperationalError, - sqlalchemy.exc.ProgrammingError) as ex: - LOG.error("Failed to get run lock for '%s': %s", self.__name, ex) - raise codechecker_api_shared.ttypes.RequestFailed( - codechecker_api_shared.ttypes.ErrorCode.DATABASE, - "Someone is already storing to the same run. Please wait " - "while the other storage is finished and try it again.") + super().__init__(token, data_path) + self._package_context = package_context + self._product_id = product_id + self.input_zip_size = input_zip_size + self.time_spent_on_task_preparation = preparation_time_elapsed - if not run_lock: - # If there is no lock record for the given run name, the run - # is not locked -- create a new lock. - run_lock = RunLock(self.__name, self.user_name) - session.add(run_lock) - elif run_lock.has_expired( - db_cleanup.RUN_LOCK_TIMEOUT_IN_DATABASE): - # There can be a lock in the database, which has already - # expired. In this case, we assume that the previous operation - # has failed, and thus, we can re-use the already present lock. - run_lock.touch() - run_lock.username = self.user_name - else: - # In case the lock exists and it has not expired, we must - # consider the run a locked one. - when = run_lock.when_expires( - db_cleanup.RUN_LOCK_TIMEOUT_IN_DATABASE) + def _implementation(self, tm: TaskManager): + try: + with open(self.data_path / "store_configuration.json", 'r', + encoding="utf-8") as cfg_f: + self.store_configuration = json.load(cfg_f) + except Exception: + LOG.error("Invalid or unusable massStoreRunAsynchronous() " + "configuration!") + raise - username = run_lock.username if run_lock.username is not None \ - else "another user" + with DBSession(tm.configuration_database_session_factory) as session: + db_product: Optional[Product] = session.query(Product) \ + .get(self._product_id) + if not db_product: + raise KeyError(f"No product with ID '{self._product_id}'") + + self._product = ServerProduct(db_product.id, + db_product.endpoint, + db_product.display_name, + db_product.connection, + self._package_context, + tm.environment) + + self._product.connect() + if self._product.db_status != DBStatus.OK: + raise EnvironmentError("Database for product " + f"'{self._product.endpoint}' is in " + "a bad shape!") + + m = MassStoreRun(self.data_path / "store_zip", + self._package_context, + tm.configuration_database_session_factory, + self._product, + self.store_configuration["run_name"], + self.store_configuration["store_tag"], + self.store_configuration["client_version"], + self.store_configuration["force_overwrite"], + self.store_configuration["path_prefixes_to_trim"], + self.store_configuration["run_description"], + self.store_configuration["user_name"], + ) + m.store(self.input_zip_size, self.time_spent_on_task_preparation) - LOG.info("Refusing to store into run '%s' as it is locked by " - "%s. Lock will expire at '%s'.", self.__name, username, - when) - raise codechecker_api_shared.ttypes.RequestFailed( - codechecker_api_shared.ttypes.ErrorCode.DATABASE, - f"The run named '{self.__name}' is being stored into by " - f"{username}. If the other store operation has failed, this " - f"lock will expire at '{when}'.") - # At any rate, if the lock has been created or updated, commit it - # into the database. - try: - session.commit() - except (sqlalchemy.exc.IntegrityError, - sqlalchemy.orm.exc.StaleDataError) as ex: - # The commit of this lock can fail. - # - # In case two store ops attempt to lock the same run name at the - # same time, committing the lock in the transaction that commits - # later will result in an IntegrityError due to the primary key - # constraint. - # - # In case two store ops attempt to lock the same run name with - # reuse and one of the operation hangs long enough before COMMIT - # so that the other operation commits and thus removes the lock - # record, StaleDataError is raised. In this case, also consider - # the run locked, as the data changed while the transaction was - # waiting, as another run wholly completed. +class MassStoreRun: + """Implementation for ``massStoreRunAsynchronous()``.""" + + # Note: The implementation of this class is called from MassStoreRunTask + # and it is executed in the background, in the context of a Task worker + # process. + # This is the place where complex implementation logic must go, but be + # careful, there is no way to communicate with the user's client anymore! + + # TODO: Poll the task manager at regular points for a cancel signal! + + def __init__(self, + zip_dir: Path, + package_context, + config_db, + product: ServerProduct, + name: str, + tag: Optional[str], + version: Optional[str], + force: bool, + trim_path_prefix_list: Optional[List[str]], + description: Optional[str], + user_name: str, + ): + self._zip_dir = zip_dir + self._name = name + self._tag = tag + self._version = version + self._force = force + self._trim_path_prefixes = trim_path_prefix_list + self._description = description + self._user_name = user_name + self.__config_db = config_db + self.__package_context = package_context + self.__product = product - LOG.info("Run '%s' got locked while current transaction " - "tried to acquire a lock. Considering run as locked.", - self.__name) - raise codechecker_api_shared.ttypes.RequestFailed( - codechecker_api_shared.ttypes.ErrorCode.DATABASE, - f"The run named '{self.__name}' is being stored into by " - "another user.") from ex + self.__mips: Dict[str, MetadataInfoParser] = {} + self.__analysis_info: Dict[str, AnalysisInfo] = {} + self.__checker_row_cache: Dict[Tuple[str, str], Checker] = {} + self.__duration: int = 0 + self.__report_count: int = 0 + self.__report_limit: int = 0 + self.__wrong_src_code_comments: List[str] = [] + self.__already_added_report_hashes: Set[str] = set() + self.__new_report_hashes: Dict[str, Tuple] = {} + self.__all_report_checkers: Set[str] = set() + self.__added_reports: List[Tuple[DBReport, Report]] = [] + self.__reports_with_fake_checkers: Dict[ + # Either a DBReport *without* an ID, or the ID of a committed + # DBReport. + str, Tuple[Report, Union[DBReport, int]]] = {} - def __free_run_lock(self, session: DBSession): - """ Remove the lock from the database for the given run name. """ - # Using with_for_update() here so the database (in case it supports - # this operation) locks the lock record's row from any other access. - run_lock = session.query(RunLock) \ - .filter(RunLock.name == self.__name) \ - .with_for_update(nowait=True).one() - session.delete(run_lock) - session.commit() + with DBSession(config_db) as session: + product = session.query(Product).get(self.__product.id) + self.__report_limit = product.report_limit def __store_source_files( self, - source_root: str, + source_root: Path, filename_to_hash: Dict[str, str] ) -> Dict[str, int]: """ Storing file contents from plist. """ @@ -434,10 +674,10 @@ def __store_source_files( file_path_to_id = {} for file_name, file_hash in filename_to_hash.items(): - source_file_path = path_for_fake_root(file_name, source_root) + source_file_path = path_for_fake_root(file_name, str(source_root)) LOG.debug("Storing source file: %s", source_file_path) trimmed_file_path = trim_path_prefixes( - file_name, self.__trim_path_prefixes) + file_name, self._trim_path_prefixes) if not os.path.isfile(source_file_path): # The file was not in the ZIP file, because we already @@ -445,7 +685,7 @@ def __store_source_files( # record in the database or we need to add one. LOG.debug('%s not found or already stored.', trimmed_file_path) - with DBSession(self.__report_server._Session) as session: + with DBSession(self.__product.session_factory) as session: fid = add_file_record( session, trimmed_file_path, file_hash) @@ -458,7 +698,7 @@ def __store_source_files( source_file_path, file_hash) continue - with DBSession(self.__report_server._Session) as session: + with DBSession(self.__product.session_factory) as session: self.__add_file_content(session, source_file_path, file_hash) file_path_to_id[trimmed_file_path] = add_file_record( @@ -468,7 +708,7 @@ def __store_source_files( def __add_blame_info( self, - blame_root: str, + blame_root: Path, filename_to_hash: Dict[str, str] ): """ @@ -480,11 +720,11 @@ def __add_blame_info( .zip file. This function stores blame info even if the corresponding source file is not in the .zip file. """ - with DBSession(self.__report_server._Session) as session: + with DBSession(self.__product.session_factory) as session: for subdir, _, files in os.walk(blame_root): for f in files: - blame_file = os.path.join(subdir, f) - file_path = blame_file[len(blame_root.rstrip("/")):] + blame_file = Path(subdir) / f + file_path = f"/{str(blame_file.relative_to(blame_root))}" blame_info, remote_url, tracking_branch = \ get_blame_file_data(blame_file) @@ -599,8 +839,8 @@ def __store_checker_identifiers(self, checkers: Set[Tuple[str, str]]): while tries < max_tries: tries += 1 try: - LOG.debug("[%s] Begin attempt %d...", self.__name, tries) - with DBSession(self.__report_server._Session) as session: + LOG.debug("[%s] Begin attempt %d...", self._name, tries) + with DBSession(self.__product.session_factory) as session: known_checkers = {(r.analyzer_name, r.checker_name) for r in session .query(Checker.analyzer_name, @@ -608,7 +848,8 @@ def __store_checker_identifiers(self, checkers: Set[Tuple[str, str]]): .all()} for analyzer, checker in \ sorted(all_checkers - known_checkers): - s = self.__context.checker_labels.severity(checker) + s = self.__package_context.checker_labels \ + .severity(checker) s = ttypes.Severity._NAMES_TO_VALUES[s] session.add(Checker(analyzer, checker, s)) LOG.debug("Acquiring ID for checker '%s/%s' " @@ -620,7 +861,7 @@ def __store_checker_identifiers(self, checkers: Set[Tuple[str, str]]): sqlalchemy.exc.ProgrammingError) as ex: LOG.error("Storing checkers of run '%s' failed: %s.\n" "Waiting %d before trying again...", - self.__name, ex, wait_time) + self._name, ex, wait_time) time.sleep(wait_time.total_seconds()) wait_time *= 2 except Exception as ex: @@ -630,10 +871,9 @@ def __store_checker_identifiers(self, checkers: Set[Tuple[str, str]]): traceback.print_exc() raise - raise codechecker_api_shared.ttypes.RequestFailed( - codechecker_api_shared.ttypes.ErrorCode.DATABASE, - "Storing the names of the checkers in the run failed due to " - "excessive contention!") + raise ConnectionRefusedError("Storing the names of the checkers in " + "the run failed due to excessive " + "contention!") def __store_analysis_statistics( self, @@ -661,7 +901,7 @@ def __store_analysis_statistics( stats[analyzer_type]["versions"].add(res["version"]) if "failed_sources" in res: - if self.__version == '6.9.0': + if self._version == '6.9.0': stats[analyzer_type]["failed_sources"].add( 'Unavailable in CodeChecker 6.9.0!') else: @@ -757,89 +997,82 @@ def __add_or_update_run( By default updates the results if name already exists. Using the force flag removes existing analysis results for a run. """ - try: - LOG.debug("Adding run '%s'...", self.__name) + LOG.debug("Adding run '%s'...", self._name) + + run = session.query(Run) \ + .filter(Run.name == self._name) \ + .one_or_none() + + update_run = True + if run and self._force: + # Clean already collected results. + if not run.can_delete: + # Deletion is already in progress. + msg = f"Can't delete {run.id}" + LOG.debug(msg) + raise EnvironmentError(msg) + + LOG.info('Removing previous analysis results...') + session.delete(run) + # Not flushing after delete leads to a constraint violation + # error later, when adding run entity with the same name as + # the old one. + session.flush() - run = session.query(Run) \ - .filter(Run.name == self.__name) \ - .one_or_none() + checker_run = Run(self._name, self._version) + session.add(checker_run) + session.flush() + run_id = checker_run.id - update_run = True - if run and self.__force: - # Clean already collected results. - if not run.can_delete: - # Deletion is already in progress. - msg = f"Can't delete {run.id}" - LOG.debug(msg) - raise codechecker_api_shared.ttypes.RequestFailed( - codechecker_api_shared.ttypes.ErrorCode.DATABASE, - msg) - - LOG.info('Removing previous analysis results...') - session.delete(run) - # Not flushing after delete leads to a constraint violation - # error later, when adding run entity with the same name as - # the old one. - session.flush() - - checker_run = Run(self.__name, self.__version) - session.add(checker_run) - session.flush() - run_id = checker_run.id - - elif run: - # There is already a run, update the results. - run.date = datetime.now() - run.duration = -1 - session.flush() - run_id = run.id - else: - # There is no run create new. - checker_run = Run(self.__name, self.__version) - session.add(checker_run) - session.flush() - run_id = checker_run.id - update_run = False - - # Add run to the history. - LOG.debug("Adding run history.") - - if self.__tag is not None: - run_history = session.query(RunHistory) \ - .filter(RunHistory.run_id == run_id, - RunHistory.version_tag == self.__tag) \ - .one_or_none() - - if run_history: - run_history.version_tag = None - session.add(run_history) - - cc_versions = set() - for mip in self.__mips.values(): - if mip.cc_version: - cc_versions.add(mip.cc_version) - - cc_version = '; '.join(cc_versions) if cc_versions else None - run_history = RunHistory( - run_id, self.__tag, self.user_name, run_history_time, - cc_version, self.__description) - - session.add(run_history) + elif run: + # There is already a run, update the results. + run.date = datetime.now() + run.duration = -1 session.flush() + run_id = run.id + else: + # There is no run create new. + checker_run = Run(self._name, self._version) + session.add(checker_run) + session.flush() + run_id = checker_run.id + update_run = False + + # Add run to the history. + LOG.debug("Adding run history.") - LOG.debug("Adding run done.") + if self._tag is not None: + run_history = session.query(RunHistory) \ + .filter(RunHistory.run_id == run_id, + RunHistory.version_tag == self._tag) \ + .one_or_none() - self.__store_analysis_statistics(session, run_history.id) - self.__store_analysis_info(session, run_history) + if run_history: + run_history.version_tag = None + session.add(run_history) - session.flush() - LOG.debug("Storing analysis statistics done.") + cc_versions = set() + for mip in self.__mips.values(): + if mip.cc_version: + cc_versions.add(mip.cc_version) - return run_id, update_run - except Exception as ex: - raise codechecker_api_shared.ttypes.RequestFailed( - codechecker_api_shared.ttypes.ErrorCode.GENERAL, - str(ex)) + cc_version = '; '.join(cc_versions) if cc_versions else None + run_history = RunHistory( + run_id, self._tag, self._user_name, run_history_time, + cc_version, self._description) + + session.add(run_history) + session.flush() + + LOG.debug("Adding run done.") + + self.__store_analysis_statistics(session, run_history.id) + self.__store_analysis_info(session, run_history) + + session.flush() + LOG.debug("Storing analysis statistics done.") + + return run_id, update_run def __get_checker(self, session: DBSession, @@ -879,49 +1112,43 @@ def __add_report( fixed_at: Optional[datetime] = None ) -> int: """ Add report to the database. """ - try: - checker = self.__checker_for_report(session, report) + checker = self.__checker_for_report(session, report) + if not checker: + # It would be too easy to create a 'Checker' instance with the + # observed data right here, but __add_report() is called in + # the context of the *BIG* TRANSACTION which has all the + # reports of the entire store pending. Losing all that + # information on a potential UNIQUE CONSTRAINT violation due + # to multiple concurrent massStoreRun()s trying to store the + # same checker ID which was never seen in a 'metadata.json' is + # not worth it. + checker = self.__get_checker(session, + FakeChecker[0], FakeChecker[1]) if not checker: - # It would be too easy to create a 'Checker' instance with the - # observed data right here, but __add_report() is called in - # the context of the *BIG* TRANSACTION which has all the - # reports of the entire store pending. Losing all that - # information on a potential UNIQUE CONSTRAINT violation due - # to multiple concurrent massStoreRun()s trying to store the - # same checker ID which was never seen in a 'metadata.json' is - # not worth it. - checker = self.__get_checker(session, - FakeChecker[0], FakeChecker[1]) - if not checker: - LOG.fatal("Psuedo-checker '%s/%s' has no " - "identity in the database, even though " - "__store_checker_identifiers() should have " - "always preemptively created it!", - FakeChecker[0], FakeChecker[1]) - raise KeyError(FakeChecker[1]) - - db_report = DBReport( - file_path_to_id[report.file.path], run_id, report.report_hash, - checker, report.line, report.column, - len(report.bug_path_events), report.message, detection_status, - review_status.status, review_status.author, - review_status.message, run_history_time, - review_status.in_source, detection_time, fixed_at) - if analysis_info: - db_report.analysis_info.append(analysis_info) - - session.add(db_report) - self.__added_reports.append((db_report, report)) - if db_report.checker.checker_name == FakeChecker[1]: - self.__reports_with_fake_checkers[report_path_hash] = \ - (report, db_report) - - return db_report.id - - except Exception as ex: - raise codechecker_api_shared.ttypes.RequestFailed( - codechecker_api_shared.ttypes.ErrorCode.GENERAL, - str(ex)) + LOG.fatal("Psuedo-checker '%s/%s' has no " + "identity in the database, even though " + "__store_checker_identifiers() should have " + "always preemptively created it!", + FakeChecker[0], FakeChecker[1]) + raise KeyError(FakeChecker[1]) + + db_report = DBReport( + file_path_to_id[report.file.path], run_id, report.report_hash, + checker, report.line, report.column, + len(report.bug_path_events), report.message, detection_status, + review_status.status, review_status.author, + review_status.message, run_history_time, + review_status.in_source, detection_time, fixed_at) + if analysis_info: + db_report.analysis_info.append(analysis_info) + + session.add(db_report) + self.__added_reports.append((db_report, report)) + if db_report.checker.checker_name == FakeChecker[1]: + self.__reports_with_fake_checkers[report_path_hash] = \ + (report, db_report) + + return db_report.id def __get_faked_checkers(self) \ -> Set[Tuple[str, str]]: @@ -966,78 +1193,67 @@ def __realise_fake_checkers(self, session): so all it does is upgrade the 'checker_id' FOREIGN KEY field to point at the real checker. """ - try: - grouped_by_checker: Dict[Tuple[str, str], List[int]] = \ - defaultdict(list) - for _, (report, db_id) in \ - self.__reports_with_fake_checkers.items(): - checker: Tuple[str, str] = checker_name_for_report(report) - grouped_by_checker[checker].append(cast(int, db_id)) - - for chk, report_ids in grouped_by_checker.items(): - analyzer_name, checker_name = chk - chk_obj = cast(Checker, self.__get_checker(session, - analyzer_name, - checker_name)) - session.query(DBReport) \ - .filter(DBReport.id.in_(report_ids)) \ - .update({"checker_id": chk_obj.id}, - synchronize_session=False) - except Exception as ex: - raise codechecker_api_shared.ttypes.RequestFailed( - codechecker_api_shared.ttypes.ErrorCode.DATABASE, - str(ex)) + grouped_by_checker: Dict[Tuple[str, str], List[int]] = \ + defaultdict(list) + for _, (report, db_id) in \ + self.__reports_with_fake_checkers.items(): + checker: Tuple[str, str] = checker_name_for_report(report) + grouped_by_checker[checker].append(cast(int, db_id)) + + for chk, report_ids in grouped_by_checker.items(): + analyzer_name, checker_name = chk + chk_obj = cast(Checker, self.__get_checker(session, + analyzer_name, + checker_name)) + session.query(DBReport) \ + .filter(DBReport.id.in_(report_ids)) \ + .update({"checker_id": chk_obj.id}, + synchronize_session=False) def __add_report_context(self, session, file_path_to_id): - try: - for db_report, report in self.__added_reports: - LOG.debug("Storing bug path positions.") - for i, p in enumerate(report.bug_path_positions): - session.add(BugReportPoint( - p.range.start_line, p.range.start_col, - p.range.end_line, p.range.end_col, - i, file_path_to_id[p.file.path], db_report.id)) - - LOG.debug("Storing bug path events.") - for i, event in enumerate(report.bug_path_events): - session.add(BugPathEvent( - event.range.start_line, event.range.start_col, - event.range.end_line, event.range.end_col, - i, event.message, file_path_to_id[event.file.path], - db_report.id)) - - LOG.debug("Storing notes.") - for note in report.notes: - data_type = report_extended_data_type_str( - ttypes.ExtendedReportDataType.NOTE) - - session.add(ExtendedReportData( - note.range.start_line, note.range.start_col, - note.range.end_line, note.range.end_col, - note.message, file_path_to_id[note.file.path], - db_report.id, data_type)) - - LOG.debug("Storing macro expansions.") - for macro in report.macro_expansions: - data_type = report_extended_data_type_str( - ttypes.ExtendedReportDataType.MACRO) - - session.add(ExtendedReportData( - macro.range.start_line, macro.range.start_col, - macro.range.end_line, macro.range.end_col, - macro.message, file_path_to_id[macro.file.path], - db_report.id, data_type)) - - if report.annotations: - self.__validate_and_add_report_annotations( - session, db_report.id, report.annotations) + for db_report, report in self.__added_reports: + LOG.debug("Storing bug path positions.") + for i, p in enumerate(report.bug_path_positions): + session.add(BugReportPoint( + p.range.start_line, p.range.start_col, + p.range.end_line, p.range.end_col, + i, file_path_to_id[p.file.path], db_report.id)) + + LOG.debug("Storing bug path events.") + for i, event in enumerate(report.bug_path_events): + session.add(BugPathEvent( + event.range.start_line, event.range.start_col, + event.range.end_line, event.range.end_col, + i, event.message, file_path_to_id[event.file.path], + db_report.id)) + + LOG.debug("Storing notes.") + for note in report.notes: + data_type = report_extended_data_type_str( + ttypes.ExtendedReportDataType.NOTE) + + session.add(ExtendedReportData( + note.range.start_line, note.range.start_col, + note.range.end_line, note.range.end_col, + note.message, file_path_to_id[note.file.path], + db_report.id, data_type)) + + LOG.debug("Storing macro expansions.") + for macro in report.macro_expansions: + data_type = report_extended_data_type_str( + ttypes.ExtendedReportDataType.MACRO) + + session.add(ExtendedReportData( + macro.range.start_line, macro.range.start_col, + macro.range.end_line, macro.range.end_col, + macro.message, file_path_to_id[macro.file.path], + db_report.id, data_type)) + + if report.annotations: + self.__validate_and_add_report_annotations( + session, db_report.id, report.annotations) - session.flush() - - except Exception as ex: - raise codechecker_api_shared.ttypes.RequestFailed( - codechecker_api_shared.ttypes.ErrorCode.GENERAL, - str(ex)) + session.flush() def __process_report_file( self, @@ -1074,7 +1290,7 @@ def get_missing_file_ids(report: Report) -> List[str]: for report in reports: self.__report_count += 1 - report.trim_path_prefixes(self.__trim_path_prefixes) + report.trim_path_prefixes(self._trim_path_prefixes) missing_ids_for_files = get_missing_file_ids(report) if missing_ids_for_files: @@ -1117,7 +1333,7 @@ def get_missing_file_ids(report: Report) -> List[str]: except ValueError as err: self.__wrong_src_code_comments.append(str(err)) - review_status.author = self.user_name + review_status.author = self._user_name review_status.date = run_history_time # False positive and intentional reports are considered as closed @@ -1181,24 +1397,17 @@ def __validate_and_add_report_annotations( try: allowed_annotations[key]["func"](value) session.add(ReportAnnotations(report_id, key, value)) - except KeyError: - # pylint: disable=raise-missing-from - raise codechecker_api_shared.ttypes.RequestFailed( - codechecker_api_shared.ttypes.ErrorCode.REPORT_FORMAT, - f"'{key}' is not an allowed report annotation.", - allowed_annotations.keys()) - except ValueError: - # pylint: disable=raise-missing-from - raise codechecker_api_shared.ttypes.RequestFailed( - codechecker_api_shared.ttypes.ErrorCode.REPORT_FORMAT, - f"'{value}' has wrong format. '{key}' annotations must be " - f"'{allowed_annotations[key]['display']}'.") - - def __get_report_limit_for_product(self): - with DBSession(self.__config_database) as session: - product = session.query(Product).get(self.__product.id) - if product.report_limit: - self.__report_limit = product.report_limit + except KeyError as ke: + raise TypeError(f"'{key}' is not an allowed report " + "annotation. " + "The allowed annotations are: " + f"{allowed_annotations.keys()}") \ + from ke + except ValueError as ve: + raise ValueError(f"'{value}' is in a wrong format! " + f"'{key}' annotations must be " + f"'{allowed_annotations[key]['display']}'.") \ + from ve def __check_report_count(self): """ @@ -1210,13 +1419,7 @@ def __check_report_count(self): LOG.error("The number of reports in the given report folder is " + "larger than the allowed." + f"The limit: {self.__report_limit}!") - extra_info = [ - "report_limit", - f"limit:{self.__report_limit}" - ] - raise codechecker_api_shared.ttypes.RequestFailed( - codechecker_api_shared.ttypes. - ErrorCode.GENERAL, + raise OverflowError( "**Report Limit Exceeded** " + "This report folder cannot be stored because the number of " + "reports in the result folder is too high. Usually noisy " + @@ -1226,14 +1429,13 @@ def __check_report_count(self): "counts. Disable checkers that have generated an excessive " + "number of reports and then rerun the analysis to be able " + "to store the results on the server. " + - f"Limit: {self.__report_limit}", - extra_info) + f"Limit: {self.__report_limit}") def __store_reports( self, session: DBSession, - report_dir: str, - source_root: str, + report_dir: Path, + source_root: Path, run_id: int, file_path_to_id: Dict[str, int], run_history_time: datetime @@ -1241,11 +1443,11 @@ def __store_reports( """ Parse up and store the plist report files. """ def get_skip_handler( - report_dir: str + report_dir: Path ) -> skiplist_handler.SkipListHandler: """ Get a skip list handler based on the given report directory.""" - skip_file_path = os.path.join(report_dir, 'skip_file') - if not os.path.exists(skip_file_path): + skip_file_path = report_dir / "skip_file" + if not skip_file_path.exists(): return skiplist_handler.SkipListHandler() LOG.debug("Pocessing skip file %s", skip_file_path) @@ -1282,9 +1484,8 @@ def get_skip_handler( for root_dir_path, _, report_file_paths in os.walk(report_dir): LOG.debug("Get reports from '%s' directory", root_dir_path) - skip_handler = get_skip_handler(root_dir_path) - - review_status_handler = ReviewStatusHandler(source_root) + skip_handler = get_skip_handler(Path(root_dir_path)) + review_status_handler = ReviewStatusHandler(str(source_root)) review_status_cfg = \ os.path.join(root_dir_path, 'review_status.yaml') @@ -1346,7 +1547,7 @@ def get_skip_handler( session.flush() - LOG.info("[%s] Processed %d analyzer result file(s).", self.__name, + LOG.info("[%s] Processed %d analyzer result file(s).", self._name, processed_result_file_count) # If a checker was found in a plist file it can not be disabled so we @@ -1377,8 +1578,8 @@ def get_skip_handler( report.fixed_at = run_history_time if reports_to_delete: - self.__report_server._removeReports( - session, list(reports_to_delete)) + from .report_server import remove_reports + remove_reports(session, reports_to_delete) def finish_checker_run( self, @@ -1401,153 +1602,126 @@ def finish_checker_run( return False - def store(self) -> int: - """ Store run results to the server. """ + def store(self, + original_zip_size: int, + time_spent_on_task_preparation: float): + """Store run results to the server.""" start_time = time.time() - # Check constraints of the run. - self.__check_run_limit() - - with DBSession(self.__report_server._Session) as session: - self.__store_run_lock(session) - try: - with TemporaryDirectory( - dir=self.__context.codechecker_workspace - ) as zip_dir: - with LogTask(run_name=self.__name, - message="Unzip storage file"): - zip_size = unzip(self.__b64zip, zip_dir) - - if zip_size == 0: - raise codechecker_api_shared.ttypes.RequestFailed( - codechecker_api_shared.ttypes. - ErrorCode.GENERAL, - "The received zip file content is empty!") - - LOG.debug("Using unzipped folder '%s'", zip_dir) - - source_root = os.path.join(zip_dir, 'root') - blame_root = os.path.join(zip_dir, 'blame') - report_dir = os.path.join(zip_dir, 'reports') - content_hash_file = os.path.join( - zip_dir, 'content_hashes.json') - - filename_to_hash = load_json(content_hash_file, {}) - - with LogTask(run_name=self.__name, - message="Store source files"): - LOG.info("[%s] Storing %d source file(s).", self.__name, - len(filename_to_hash.keys())) - file_path_to_id = self.__store_source_files( - source_root, filename_to_hash) - self.__add_blame_info(blame_root, filename_to_hash) - - run_history_time = datetime.now() - - # Parse all metadata information from the report directory. - with LogTask(run_name=self.__name, - message="Parse 'metadata.json's"): - for root_dir_path, _, _ in os.walk(report_dir): - metadata_file_path = os.path.join( - root_dir_path, 'metadata.json') - - self.__mips[root_dir_path] = \ - MetadataInfoParser(metadata_file_path) - - with LogTask(run_name=self.__name, - message="Store look-up ID for checkers in " - "'metadata.json'"): - checkers_in_metadata = { - (analyzer, checker) - for metadata in self.__mips.values() - for analyzer in metadata.analyzers - for checker - in metadata.checkers.get(analyzer, {}).keys()} - self.__store_checker_identifiers(checkers_in_metadata) - - try: - # This session's transaction buffer stores the actual - # run data into the database. - with DBSession(self.__report_server._Session) as session, \ - RunLocking(session, self.__name): - # Actual store operation begins here. - run_id, update_run = self.__add_or_update_run( - session, run_history_time) - - with LogTask(run_name=self.__name, - message="Store reports"): - self.__store_reports( - session, report_dir, source_root, run_id, - file_path_to_id, run_history_time) - - session.commit() - self.__load_report_ids_for_reports_with_fake_checkers( - session) + LOG.debug("Using unzipped folder '%s'", self._zip_dir) + + source_root = self._zip_dir / "root" + blame_root = self._zip_dir / "blame" + report_dir = self._zip_dir / "reports" + filename_to_hash = load_json( + self._zip_dir / "content_hashes.json", {}) + + # Store information that is "global" on the product database level. + with StepLog(self._name, "Store source files"): + LOG.info("[%s] Storing %d source file(s).", self._name, + len(filename_to_hash.keys())) + file_path_to_id = self.__store_source_files( + source_root, filename_to_hash) + self.__add_blame_info(blame_root, filename_to_hash) + + run_history_time = datetime.now() + + with StepLog(self._name, "Parse 'metadata.json's"): + for root_dir_path, _, _ in os.walk(report_dir): + metadata_file_path = os.path.join( + root_dir_path, 'metadata.json') + + self.__mips[root_dir_path] = \ + MetadataInfoParser(metadata_file_path) + + with StepLog(self._name, + "Store look-up ID for checkers in 'metadata.json'"): + checkers_in_metadata = { + (analyzer, checker) + for metadata in self.__mips.values() + for analyzer in metadata.analyzers + for checker + in metadata.checkers.get(analyzer, {}).keys()} + self.__store_checker_identifiers(checkers_in_metadata) + + try: + # This session's transaction buffer stores the actual run data + # into the database. + with DBSession(self.__product.session_factory) as session, \ + RunLock(session, self._name): + run_id, update_run = self.__add_or_update_run( + session, run_history_time) + + with StepLog(self._name, "Store 'reports'"): + self.__store_reports( + session, report_dir, source_root, run_id, + file_path_to_id, run_history_time) + session.commit() + self.__load_report_ids_for_reports_with_fake_checkers( + session) + + if self.__reports_with_fake_checkers: + with StepLog( + self._name, + "Get look-up IDs for checkers not present in " + "'metadata.json'"): + additional_checkers = self.__get_faked_checkers() + # __store_checker_identifiers() has its own + # TRANSACTION! + self.__store_checker_identifiers( + additional_checkers) + + with DBSession(self.__product.session_factory) as session, \ + RunLock(session, self._name): + # The data of the run has been successfully committed + # into the database. Deal with post-processing issues + # that could only be done after-the-fact. if self.__reports_with_fake_checkers: - with LogTask(run_name=self.__name, - message="Get look-up ID for checkers " - "not present in 'metadata.json'"): - additional_checkers = self.__get_faked_checkers() - # __store_checker_identifiers() has its own - # TRANSACTION! - self.__store_checker_identifiers( - additional_checkers) - - with DBSession(self.__report_server._Session) as session, \ - RunLocking(session, self.__name): - # The data of the run has been successfully committed - # into the database. Deal with post-processing issues - # that could only be done after-the-fact. - if self.__reports_with_fake_checkers: - with LogTask(run_name=self.__name, - message="Fix-up report-to-checker " - "associations"): - self.__realise_fake_checkers(session) - - self.finish_checker_run(session, run_id) - session.commit() - - # If it's a run update, do not increment the number - # of runs of the current product. - inc_num_of_runs = 1 if not update_run else None - - self.__report_server._set_run_data_for_curr_product( - inc_num_of_runs, run_history_time) - - runtime = round(time.time() - start_time, 2) - zip_size_kb = round(zip_size / 1024) - - tag_desc = "" - if self.__tag: - tag_desc = f", under tag '{self.__tag}'" - - LOG.info("'%s' stored results (%s KB " - "/decompressed/) to run '%s' (id: %d) %s in " - "%s seconds.", self.user_name, - zip_size_kb, self.__name, run_id, tag_desc, - runtime) - - iso_start_time = datetime.fromtimestamp( - start_time).isoformat() - - log_msg = f"{iso_start_time}, " +\ - f"{runtime}s, " +\ - f'"{self.__product.name}", ' +\ - f'"{self.__name}", ' +\ - f"{zip_size_kb}KB, " +\ - f"{self.__report_count}, " +\ - f"{run_id}" - - STORE_TIME_LOG.info(log_msg) - - return run_id - except (sqlalchemy.exc.OperationalError, - sqlalchemy.exc.ProgrammingError) as ex: - raise codechecker_api_shared.ttypes.RequestFailed( - codechecker_api_shared.ttypes.ErrorCode.DATABASE, - f"Storing reports to the database failed: {ex}") + with StepLog(self._name, + "Fix-up report-to-checker associations"): + self.__realise_fake_checkers(session) + + self.finish_checker_run(session, run_id) + session.commit() + + end_time = time.time() + + # If the current store() updated an existing run, do not + # increment the number of runs saved for the product. + self.__product.set_cached_run_data( + self.__config_db, + number_of_runs_change=(0 if update_run else 1), + last_store_date=run_history_time) + + run_time: float = (end_time - start_time) + \ + time_spent_on_task_preparation + zip_size_kib: float = original_zip_size / 1024 + + LOG.info("'%s' stored results (decompressed size: %.2f KiB) " + "to run '%s' (ID: %d%s) in %.2f seconds.", + self._user_name, zip_size_kib, self._name, run_id, + f", under tag '{self._tag}'" if self._tag else "", + run_time) + + iso_start_time = datetime.fromtimestamp(start_time) \ + .isoformat() + + log_msg = f"{iso_start_time}, " \ + f"{round(run_time, 2)}s, " \ + f'"{self.__product.name}", ' \ + f'"{self._name}", ' \ + f"{round(zip_size_kib)}KiB, " \ + f"{self.__report_count}, " \ + f"{run_id}" + + STORE_TIME_LOG.info(log_msg) + except (sqlalchemy.exc.OperationalError, + sqlalchemy.exc.ProgrammingError) as ex: + LOG.error("Database error! Storing reports to the " + "database failed: %s", ex) + raise except Exception as ex: LOG.error("Failed to store results: %s", ex) import traceback @@ -1560,10 +1734,17 @@ def store(self) -> int: # (If the failure is undetectable, the coded grace period expiry # of the lock will allow further store operations to the given # run name.) - with DBSession(self.__report_server._Session) as session: - self.__free_run_lock(session) + with DBSession(self.__product.session_factory) as session: + RunLock(session, self._name).drop_run_lock_from_db() if self.__wrong_src_code_comments: - raise codechecker_api_shared.ttypes.RequestFailed( - codechecker_api_shared.ttypes.ErrorCode.SOURCE_FILE, - self.__wrong_src_code_comments) + wrong_files_as_table = twodim.to_str( + "table", + ["File", "Line", "Checker name"], + [wrong_comment.split('|', 3) + for wrong_comment in self.__wrong_src_code_comments]) + + raise ValueError("One or more source files contained invalid " + "source code comments! " + "Failed to set review statuses.\n\n" + f"{wrong_files_as_table}") diff --git a/web/server/codechecker_server/api/report_server.py b/web/server/codechecker_server/api/report_server.py index 2348708b81..3248146761 100644 --- a/web/server/codechecker_server/api/report_server.py +++ b/web/server/codechecker_server/api/report_server.py @@ -21,8 +21,8 @@ from copy import deepcopy from collections import OrderedDict, defaultdict, namedtuple -from datetime import datetime, timedelta -from typing import Any, Dict, List, Optional, Set, Tuple +from datetime import datetime +from typing import Any, Collection, Dict, List, Optional, Set, Tuple import sqlalchemy from sqlalchemy.sql.expression import or_, and_, not_, func, \ @@ -1340,13 +1340,26 @@ def get_is_opened_case(subquery): ) +def remove_reports(session: DBSession, + report_ids: Collection, + chunk_size: int = SQLITE_MAX_VARIABLE_NUMBER): + """ + Removes `Report`s in chunks. + """ + for r_ids in util.chunks(iter(report_ids), chunk_size): + session.query(Report) \ + .filter(Report.id.in_(r_ids)) \ + .delete(synchronize_session=False) + + class ThriftRequestHandler: """ Connect to database and handle thrift client requests. """ def __init__(self, - manager, + session_manager, + task_manager, Session, product, auth_session, @@ -1359,7 +1372,8 @@ def __init__(self, raise ValueError("Cannot initialize request handler without " "a product to serve.") - self._manager = manager + self._manager = session_manager + self._task_manager = task_manager self._product = product self._auth_session = auth_session self._config_database = config_database @@ -1377,34 +1391,6 @@ def _get_username(self): """ return self._auth_session.user if self._auth_session else "Anonymous" - def _set_run_data_for_curr_product( - self, - inc_num_of_runs: Optional[int], - latest_storage_date: Optional[datetime] = None - ): - """ - Increment the number of runs related to the current product with the - given value and set the latest storage date. - """ - values = {} - - if inc_num_of_runs is not None: - values["num_of_runs"] = Product.num_of_runs + inc_num_of_runs - # FIXME: This log is likely overkill. - LOG.info("Run counter in the config database was %s by %i.", - 'increased' if inc_num_of_runs >= 0 else 'decreased', - abs(inc_num_of_runs)) - - if latest_storage_date is not None: - values["latest_storage_date"] = latest_storage_date - - with DBSession(self._config_database) as session: - session.query(Product) \ - .filter(Product.id == self._product.id) \ - .update(values) - - session.commit() - def __require_permission(self, required): """ Helper method to raise an UNAUTHORIZED exception if the user does not @@ -3595,16 +3581,6 @@ def removeRunResults(self, run_ids): failed = True return not failed - def _removeReports(self, session, report_ids, - chunk_size=SQLITE_MAX_VARIABLE_NUMBER): - """ - Removing reports in chunks. - """ - for r_ids in util.chunks(iter(report_ids), chunk_size): - session.query(Report) \ - .filter(Report.id.in_(r_ids)) \ - .delete(synchronize_session=False) - @exc_to_thrift_reqfail @timeit def removeRunReports(self, run_ids, report_filter, cmp_data): @@ -3634,7 +3610,7 @@ def removeRunReports(self, run_ids, report_filter, cmp_data): reports_to_delete = [r[0] for r in q] if reports_to_delete: - self._removeReports(session, reports_to_delete) + remove_reports(session, reports_to_delete) session.commit() session.close() @@ -3706,9 +3682,9 @@ def removeRun(self, run_id, run_filter): LOG.info("Runs '%s' were removed by '%s'.", "', '".join(runs), self._get_username()) - # Decrement the number of runs but do not update the latest storage - # date. - self._set_run_data_for_curr_product(-1 * deleted_run_cnt) + self._product.set_cached_run_data( + self._config_database, + number_of_runs_change=-1 * deleted_run_cnt) # Remove unused comments and unused analysis info from the database. # Originally db_cleanup.remove_unused_data() was used here which @@ -3893,14 +3869,69 @@ def getMissingContentHashesForBlameInfo(self, file_hashes): @exc_to_thrift_reqfail @timeit - def massStoreRun(self, name, tag, version, b64zip, force, - trim_path_prefixes, description): + def massStoreRun(self, + name: str, + tag: Optional[str], + version: str, + b64zip: str, + force: bool, + trim_path_prefixes: Optional[List[str]], + description: Optional[str]) -> int: self.__require_store() - - from codechecker_server.api.mass_store_run import MassStoreRun - m = MassStoreRun(self, name, tag, version, b64zip, force, - trim_path_prefixes, description) - return m.store() + if not name: + raise ValueError("A run name is needed to know where to store!") + + from .mass_store_run import MassStoreRunInputHandler, MassStoreRunTask + ih = MassStoreRunInputHandler(self._manager, + self._config_database, + self._Session, + self._task_manager, + self._context, + self._product.id, + name, + description, + tag, + version, + force, + trim_path_prefixes, + b64zip, + self._get_username()) + ih.check_store_input_validity_at_face_value() + m: MassStoreRunTask = ih.create_mass_store_task(False) + self._task_manager.push_task(m) + + LOG.info("massStoreRun(): Running as '%s' ...", m.token) + + # To be compatible with older (<= 6.24, API <= 6.58) clients which + # may keep using the old API endpoint, simulate awaiting the + # background task in the API handler. + while True: + time.sleep(5) + t = self._task_manager.get_task_record(m.token) + if t.is_in_terminated_state: + if t.status == "failed": + raise codechecker_api_shared.ttypes.RequestFailed( + codechecker_api_shared.ttypes.ErrorCode.GENERAL, + "massStoreRun()'s processing failed. Here follow " + f"the details:\n\n{t.comments}") + if t.status == "cancelled": + raise codechecker_api_shared.ttypes.RequestFailed( + codechecker_api_shared.ttypes.ErrorCode.GENERAL, + "Server administrators cancelled the processing of " + "the massStoreRun() request!") + break + + # Prior to CodeChecker 6.25.0 (API v6.59), massStoreRun() was + # completely synchronous and blocking, and the implementation of the + # storage logic returned the ID of the run that was stored by the + # call. + # massStoreRun() was implemented in + # commit 2b29d787599da0318cd23dbe816377b9bce7236c (September 2017), + # replacing the previously used (and then completely removed!) + # addCheckerRun() function, which also returned the run's ID. + # The official client implementation stopped using this returned value + # from the moment of massStoreRun()'s implementation. + return -1 @exc_to_thrift_reqfail @timeit diff --git a/web/server/codechecker_server/product.py b/web/server/codechecker_server/product.py new file mode 100644 index 0000000000..3c18de4339 --- /dev/null +++ b/web/server/codechecker_server/product.py @@ -0,0 +1,236 @@ +# ------------------------------------------------------------------------- +# +# Part of the CodeChecker project, under the Apache License v2.0 with +# LLVM Exceptions. See LICENSE for license information. +# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +# +# ------------------------------------------------------------------------- +""" +The in-memory representation and access methods for querying and mutating a +"Product": a separate and self-contained database and entity containing +analysis results and associated information, which a CodeChecker server can +connect to. +""" +from datetime import datetime +from typing import Optional + +from sqlalchemy.orm import sessionmaker + +from codechecker_api_shared.ttypes import DBStatus + +from codechecker_common.logger import get_logger + +from .database import database, db_cleanup +from .database.config_db_model import Product as DBProduct +from .database.database import DBSession +from .database.run_db_model import \ + IDENTIFIER as RUN_META, \ + Run, RunLock + + +LOG = get_logger("server") + + +class Product: + """ + Represents a product, which is a distinct storage of analysis reports in + a separate database (and database connection) with its own access control. + """ + + # The amount of SECONDS that need to pass after the last unsuccessful + # connect() call so the next could be made. + CONNECT_RETRY_TIMEOUT = 300 + + def __init__(self, id_: int, endpoint: str, display_name: str, + connection_string: str, context, check_env): + """ + Set up a new managed product object for the configuration given. + """ + self.__id = id_ + self.__endpoint = endpoint + self.__display_name = display_name + self.__connection_string = connection_string + self.__driver_name = None + self.__context = context + self.__check_env = check_env + self.__engine = None + self.__session = None + self.__db_status = DBStatus.MISSING + + self.__last_connect_attempt = None + + @property + def id(self): + return self.__id + + @property + def endpoint(self): + """ + Returns the accessible URL endpoint of the product. + """ + return self.__endpoint + + @property + def name(self): + """ + Returns the display name of the product. + """ + return self.__display_name + + @property + def session_factory(self): + """ + Returns the session maker on this product's database engine which + can be used to initiate transactional connections. + """ + return self.__session + + @property + def driver_name(self): + """ + Returns the name of the sql driver (sqlite, postgres). + """ + return self.__driver_name + + @property + def db_status(self): + """ + Returns the status of the database which belongs to this product. + Call connect to update it. + """ + return self.__db_status + + @property + def last_connection_failure(self): + """ + Returns the reason behind the last executed connection attempt's + failure. + """ + return self.__last_connect_attempt[1] if self.__last_connect_attempt \ + else None + + def connect(self, init_db=False): + """ + Initiates the actual connection to the database configured for the + product. + + Each time the connect is called the db_status is updated. + """ + LOG.debug("Checking '%s' database.", self.endpoint) + + sql_server = database.SQLServer.from_connection_string( + self.__connection_string, + self.__endpoint, + RUN_META, + self.__context.run_migration_root, + interactive=False, + env=self.__check_env) + + if isinstance(sql_server, database.PostgreSQLServer): + self.__driver_name = 'postgresql' + elif isinstance(sql_server, database.SQLiteDatabase): + self.__driver_name = 'sqlite' + + try: + LOG.debug("Trying to connect to the database") + + # Create the SQLAlchemy engine. + self.__engine = sql_server.create_engine() + LOG.debug(self.__engine) + + self.__session = sessionmaker(bind=self.__engine) + + self.__engine.execute('SELECT 1') + self.__db_status = sql_server.check_schema() + self.__last_connect_attempt = None + + if self.__db_status == DBStatus.SCHEMA_MISSING and init_db: + LOG.debug("Initializing new database schema.") + self.__db_status = sql_server.connect(init_db) + + except Exception as ex: + LOG.exception("The database for product '%s' cannot be" + " connected to.", self.endpoint) + self.__db_status = DBStatus.FAILED_TO_CONNECT + self.__last_connect_attempt = (datetime.now(), str(ex)) + + def get_details(self): + """ + Get details for a product from the database. + + It may throw different error messages depending on the used SQL driver + adapter in case of connection error. + """ + with DBSession(self.session_factory) as run_db_session: + run_locks = run_db_session.query(RunLock.name) \ + .filter(RunLock.locked_at.isnot(None)) \ + .all() + + runs_in_progress = set(run_lock[0] for run_lock in run_locks) + + num_of_runs = run_db_session.query(Run).count() + + latest_store_to_product = "" + if num_of_runs: + last_updated_run = run_db_session.query(Run) \ + .order_by(Run.date.desc()) \ + .limit(1) \ + .one_or_none() + + latest_store_to_product = last_updated_run.date + + return num_of_runs, runs_in_progress, latest_store_to_product + + def teardown(self): + """ + Disposes the database connection to the product's backend. + """ + if self.__db_status == DBStatus.FAILED_TO_CONNECT: + return + + self.__engine.dispose() + + self.__session = None + self.__engine = None + + def cleanup_run_db(self): + """ + Cleanup the run database which belongs to this product. + """ + LOG.info("[%s] Garbage collection started...", self.endpoint) + + db_cleanup.remove_expired_data(self) + db_cleanup.remove_unused_data(self) + db_cleanup.update_contextual_data(self, self.__context) + + LOG.info("[%s] Garbage collection finished.", self.endpoint) + return True + + def set_cached_run_data(self, + config_db_session_factory, + number_of_runs_change: Optional[int] = None, + last_store_date: Optional[datetime] = None): + """ + Update the configuration database row for the current `Product` + for the keys that contain cached summaries of what would otherwise + be queriable from the product's database. + """ + updates = {} + + if number_of_runs_change: + updates["num_of_runs"] = DBProduct.num_of_runs \ + + number_of_runs_change + LOG.info("%s: Changing 'num_of_runs' in CONFIG database by %s%i.", + self.__endpoint, + '+' if number_of_runs_change > 0 else '-', + abs(number_of_runs_change)) + + if last_store_date: + updates["latest_storage_date"] = last_store_date + + if updates: + with DBSession(config_db_session_factory) as session: + session.query(DBProduct) \ + .filter(DBProduct.id == self.__id) \ + .update(updates) + session.commit() diff --git a/web/server/codechecker_server/server.py b/web/server/codechecker_server/server.py index f10f28291e..247c66c3ee 100644 --- a/web/server/codechecker_server/server.py +++ b/web/server/codechecker_server/server.py @@ -13,7 +13,6 @@ import atexit from collections import Counter -import datetime from functools import partial from hashlib import sha256 from http.server import HTTPServer, SimpleHTTPRequestHandler @@ -68,14 +67,15 @@ from .api.server_info_handler import \ ThriftServerInfoHandler as ServerInfoHandler_v6 from .api.tasks import ThriftTaskHandler as TaskHandler_v6 -from .database import database, db_cleanup from .database.config_db_model import Product as ORMProduct, \ Configuration as ORMConfiguration from .database.database import DBSession -from .database.run_db_model import IDENTIFIER as RUN_META, Run, RunLock +from .database.run_db_model import Run +from .product import Product +from .session_manager import SessionManager, SESSION_COOKIE_NAME from .task_executors.main import executor as background_task_executor from .task_executors.task_manager import \ - TaskManager as BackgroundTaskManager, drop_all_incomplete_tasks + TaskManager as BackgroundTaskManager LOG = get_logger('server') @@ -421,6 +421,7 @@ def do_POST(self): acc_handler = ReportHandler_v6( self.server.manager, + self.server.task_manager, product.session_factory, product, self.auth_session, @@ -450,7 +451,7 @@ def do_POST(self): self.send_response(200) self.send_header("content-type", "application/x-thrift") - self.send_header("Content-Length", len(result)) + self.send_header("Content-Length", str(len(result))) self.end_headers() self.wfile.write(result) return @@ -497,182 +498,6 @@ def translate_path(self, path): return path -class Product: - """ - Represents a product, which is a distinct storage of analysis reports in - a separate database (and database connection) with its own access control. - """ - - # The amount of SECONDS that need to pass after the last unsuccessful - # connect() call so the next could be made. - CONNECT_RETRY_TIMEOUT = 300 - - def __init__(self, id_: int, endpoint: str, display_name: str, - connection_string: str, context, check_env): - """ - Set up a new managed product object for the configuration given. - """ - self.__id = id_ - self.__endpoint = endpoint - self.__display_name = display_name - self.__connection_string = connection_string - self.__driver_name = None - self.__context = context - self.__check_env = check_env - self.__engine = None - self.__session = None - self.__db_status = DBStatus.MISSING - - self.__last_connect_attempt = None - - @property - def id(self): - return self.__id - - @property - def endpoint(self): - """ - Returns the accessible URL endpoint of the product. - """ - return self.__endpoint - - @property - def name(self): - """ - Returns the display name of the product. - """ - return self.__display_name - - @property - def session_factory(self): - """ - Returns the session maker on this product's database engine which - can be used to initiate transactional connections. - """ - return self.__session - - @property - def driver_name(self): - """ - Returns the name of the sql driver (sqlite, postgres). - """ - return self.__driver_name - - @property - def db_status(self): - """ - Returns the status of the database which belongs to this product. - Call connect to update it. - """ - return self.__db_status - - @property - def last_connection_failure(self): - """ - Returns the reason behind the last executed connection attempt's - failure. - """ - return self.__last_connect_attempt[1] if self.__last_connect_attempt \ - else None - - def connect(self, init_db=False): - """ - Initiates the actual connection to the database configured for the - product. - - Each time the connect is called the db_status is updated. - """ - LOG.debug("Checking '%s' database.", self.endpoint) - - sql_server = database.SQLServer.from_connection_string( - self.__connection_string, - self.__endpoint, - RUN_META, - self.__context.run_migration_root, - interactive=False, - env=self.__check_env) - - if isinstance(sql_server, database.PostgreSQLServer): - self.__driver_name = 'postgresql' - elif isinstance(sql_server, database.SQLiteDatabase): - self.__driver_name = 'sqlite' - - try: - LOG.debug("Trying to connect to the database") - - # Create the SQLAlchemy engine. - self.__engine = sql_server.create_engine() - LOG.debug(self.__engine) - - self.__session = sessionmaker(bind=self.__engine) - - self.__engine.execute('SELECT 1') - self.__db_status = sql_server.check_schema() - self.__last_connect_attempt = None - - if self.__db_status == DBStatus.SCHEMA_MISSING and init_db: - LOG.debug("Initializing new database schema.") - self.__db_status = sql_server.connect(init_db) - - except Exception as ex: - LOG.exception("The database for product '%s' cannot be" - " connected to.", self.endpoint) - self.__db_status = DBStatus.FAILED_TO_CONNECT - self.__last_connect_attempt = (datetime.datetime.now(), str(ex)) - - def get_details(self): - """ - Get details for a product from the database. - - It may throw different error messages depending on the used SQL driver - adapter in case of connection error. - """ - with DBSession(self.session_factory) as run_db_session: - run_locks = run_db_session.query(RunLock.name) \ - .filter(RunLock.locked_at.isnot(None)) \ - .all() - - runs_in_progress = set(run_lock[0] for run_lock in run_locks) - - num_of_runs = run_db_session.query(Run).count() - - latest_store_to_product = "" - if num_of_runs: - last_updated_run = run_db_session.query(Run) \ - .order_by(Run.date.desc()) \ - .limit(1) \ - .one_or_none() - - latest_store_to_product = last_updated_run.date - - return num_of_runs, runs_in_progress, latest_store_to_product - - def teardown(self): - """ - Disposes the database connection to the product's backend. - """ - if self.__db_status == DBStatus.FAILED_TO_CONNECT: - return - - self.__engine.dispose() - - self.__session = None - self.__engine = None - - def cleanup_run_db(self): - """ - Cleanup the run database which belongs to this product. - """ - LOG.info("[%s] Garbage collection started...", self.endpoint) - - db_cleanup.remove_expired_data(self) - db_cleanup.remove_unused_data(self) - db_cleanup.update_contextual_data(self, self.__context) - - LOG.info("[%s] Garbage collection finished.", self.endpoint) - return True - - def _do_db_cleanup(context, check_env, id_: int, endpoint: str, display_name: str, connection_str: str) -> Tuple[Optional[bool], str]: @@ -782,10 +607,10 @@ def __init__(self, self.manager.set_database_connection(self.config_session) self.__task_queue = task_queue - self.task_manager = BackgroundTaskManager(task_queue, - self.config_session, - server_shutdown_flag, - machine_id) + self.task_manager = BackgroundTaskManager( + task_queue, self.config_session, self.check_env, + server_shutdown_flag, machine_id, + pathlib.Path(self.context.codechecker_workspace)) # Load the initial list of products and set up the server. cfg_sess = self.config_session() @@ -1163,23 +988,6 @@ def start_server(config_directory: str, package_data, port: int, else: LOG.debug("Skipping db_cleanup, as requested.") - def _cleanup_incomplete_tasks(action: str) -> int: - config_session_factory = config_sql_server.create_engine() - try: - return drop_all_incomplete_tasks( - sessionmaker(bind=config_session_factory), - machine_id, action) - finally: - config_session_factory.dispose() - - dropped_tasks = _cleanup_incomplete_tasks( - "New server started with the same machine_id, assuming the old " - "server is dead and won't be able to finish the task.") - if dropped_tasks: - LOG.info("At server startup, dropped %d background tasks left behind " - "by a previous server instance matching machine ID '%s'.", - dropped_tasks, machine_id) - api_processes: Dict[int, Process] = {} requested_api_threads = cast(int, manager.worker_processes) \ or cpu_count() @@ -1194,6 +1002,34 @@ def _cleanup_incomplete_tasks(action: str) -> int: bg_task_queue: Queue = Queue() is_server_shutting_down = Value('B', False) + def _cleanup_incomplete_tasks(action: str) -> int: + config_db = config_sql_server.create_engine() + config_session_factory = sessionmaker(bind=config_db) + tm = BackgroundTaskManager( + bg_task_queue, config_session_factory, check_env, + is_server_shutting_down, machine_id, + pathlib.Path(context.codechecker_workspace)) + + try: + tm.destroy_all_temporary_data() + except OSError: + LOG.warning("Clearing task-temporary storage space failed!") + import traceback + traceback.print_exc() + + try: + return tm.drop_all_incomplete_tasks(action) + finally: + config_db.dispose() + + dropped_tasks = _cleanup_incomplete_tasks( + "New server started with the same machine_id, assuming the old " + "server is dead and won't be able to finish the task.") + if dropped_tasks: + LOG.info("At server startup, dropped %d background tasks left behind " + "by a previous server instance matching machine ID '%s'.", + dropped_tasks, machine_id) + server_clazz = CCSimpleHttpServer if ':' in listen_address: # IPv6 address specified for listening. @@ -1295,6 +1131,7 @@ def spawn_bg_process(): target=background_task_executor, args=(bg_task_queue, config_sql_server, + check_env, is_server_shutting_down, machine_id, ), diff --git a/web/server/codechecker_server/task_executors/abstract_task.py b/web/server/codechecker_server/task_executors/abstract_task.py index f38830ad33..bb665f034e 100644 --- a/web/server/codechecker_server/task_executors/abstract_task.py +++ b/web/server/codechecker_server/task_executors/abstract_task.py @@ -9,6 +9,7 @@ Contains the base class to be inherited and implemented by all background task types. """ +import logging import os import pathlib import shutil @@ -82,6 +83,8 @@ def destroy_data(self): try: shutil.rmtree(self._data_path) + LOG.debug("Wiping temporary data of task '%s' at '%s' ...", + self._token, self._data_path) except Exception as ex: LOG.warning("Failed to remove background task's data_dir at " "'%s':\n%s", self.data_path, str(ex)) @@ -94,7 +97,7 @@ def _implementation(self, _task_manager: "TaskManager") -> None: context of the executed subprocess, to query and mutate service-level information about the current task. """ - raise NotImplementedError() + raise NotImplementedError(f"No implementation for task class {self}!") def execute(self, task_manager: "TaskManager") -> None: """ @@ -183,6 +186,10 @@ def _log_exception_and_fail(db_task: DBTask): db_task.add_comment( f"FAILED!\nException during execution:\n{str(ex)}", "SYSTEM[AbstractTask::execute()]") + if LOG.isEnabledFor(logging.DEBUG): + db_task.add_comment("Debug exception information:\n" + f"{traceback.format_exc()}", + "SYSTEM[AbstractTask::execute()]") db_task.set_finished(successfully=False) task_manager._mutate_task_record(self, _log_exception_and_fail) diff --git a/web/server/codechecker_server/task_executors/main.py b/web/server/codechecker_server/task_executors/main.py index 580a27c4b9..320de737c5 100644 --- a/web/server/codechecker_server/task_executors/main.py +++ b/web/server/codechecker_server/task_executors/main.py @@ -31,6 +31,7 @@ def executor(queue: Queue, config_db_sql_server, + server_environment, server_shutdown_flag: "Value", machine_id: str): """ @@ -66,8 +67,8 @@ def executor_hangup_handler(signum: int, _frame): signal.signal(signal.SIGHUP, executor_hangup_handler) config_db_engine = config_db_sql_server.create_engine() - tm = TaskManager(queue, sessionmaker(bind=config_db_engine), kill_flag, - machine_id) + tm = TaskManager(queue, sessionmaker(bind=config_db_engine), + server_environment, kill_flag, machine_id) while not kill_flag.value: try: diff --git a/web/server/codechecker_server/task_executors/task_manager.py b/web/server/codechecker_server/task_executors/task_manager.py index ddd3b31053..6b6929109a 100644 --- a/web/server/codechecker_server/task_executors/task_manager.py +++ b/web/server/codechecker_server/task_executors/task_manager.py @@ -11,6 +11,8 @@ """ import os from pathlib import Path +import re +import shutil import tempfile from typing import Callable, Optional @@ -24,6 +26,7 @@ from ..database.database import DBSession MAX_TOKEN_RANDOM_RETRIES = 10 +CHARS_INVALID_IN_PATH = re.compile(r"[\'\"<>:\\/\|\*\?\. ]") LOG = get_logger("server") @@ -47,11 +50,33 @@ class TaskManager: """ def __init__(self, q: Queue, config_db_session_factory, - executor_kill_flag: Value, machine_id: str): + server_environment, + executor_kill_flag: Value, + machine_id: str, + temp_dir: Optional[Path] = None): self._queue = q self._database_factory = config_db_session_factory + self._server_environment = server_environment self._is_shutting_down = executor_kill_flag self._machine_id = machine_id + self._temp_dir_root = (temp_dir or Path(tempfile.gettempdir())) \ + / "codechecker_tasks" \ + / CHARS_INVALID_IN_PATH.sub('_', machine_id) + + os.makedirs(self._temp_dir_root, exist_ok=True) + + @property + def configuration_database_session_factory(self): + """ + Returns a `sqlalchemy.orm.sessionmaker` instance for the server + configuration database. + """ + return self._database_factory + + @property + def environment(self): + """Returns the ``check_env`` injected into the task manager.""" + return self._server_environment @property def machine_id(self) -> str: @@ -95,31 +120,78 @@ def allocate_task_record(self, kind: str, summary: str, def create_task_data(self, token: str) -> Path: """ Creates a temporary directory which is **NOT** cleaned up - automatically, and suitable for putting arbitrary files underneath - to communicate large inputs (that should not be put in the `Queue`) - to the `execute` method of an `AbstractTask`. + automatically by the current context, and which is suitable for + putting arbitrary files underneath to communicate large inputs + (that should not be put in the `Queue`) to the `execute` method of + an `AbstractTask`. + + The larger business logic of the Server implementation may still clean + up the temporary directories, e.g., if the pending tasks are being + dropped during a shutdown, making retention of this "temporary data" + useless. + See `destroy_temporary_data`. + """ + task_temp_dir = tempfile.mkdtemp(prefix=f"{token}-", + dir=self._temp_dir_root) + return Path(task_temp_dir) + + def destroy_all_temporary_data(self): """ - task_tmp_root = Path(tempfile.gettempdir()) / "codechecker_tasks" \ - / self.machine_id - os.makedirs(task_tmp_root, exist_ok=True) + Removes the contents of task-temporary directories under the + `TaskManager`'s initial `temp_dir` and current "machine ID". + """ + try: + shutil.rmtree(self._temp_dir_root) + except Exception as ex: + LOG.warning("Failed to remove background tasks' data_dirs at " + "'%s':\n%s", self._temp_dir_root, str(ex)) - task_tmp_dir = tempfile.mkdtemp(prefix=f"{token}-") - return Path(task_tmp_dir) + def drop_all_incomplete_tasks(self, action: str) -> int: + """ + Sets all tasks in the database that were associated with the given + `machine_id` to ``"dropped"`` status, indicating that the status was + changed during the `action`. - def _get_task_record(self, task_obj: "AbstractTask") -> DBTask: + Returns the number of `DBTask`s actually changed. + """ + count: int = 0 + with DBSession(self._database_factory) as session: + for t in session.query(DBTask) \ + .filter(DBTask.machine_id == self.machine_id, + DBTask.status.in_(["allocated", + "enqueued", + "running"])) \ + .all(): + count += 1 + t.add_comment(f"DROPPED!\n{action}", + "SYSTEM") + t.set_abandoned(force_dropped_status=True) + + session.commit() + return count + + def get_task_record(self, token: str) -> DBTask: """ Retrieves the `DBTask` for the task identified by `task_obj`. This class should not be mutated, only the fields queried. """ with DBSession(self._database_factory) as session: - try: - db_task = session.query(DBTask).get(task_obj.token) - session.expunge(db_task) - return db_task - except sqlalchemy.exc.SQLAlchemyError as sql_err: - raise KeyError(f"No task record for token '{task_obj.token}' " - "in the database") from sql_err + db_task: Optional[DBTask] = \ + session.query(DBTask).get(token) + if not db_task: + raise KeyError(f"No task record for token '{token}' " + "in the database") + session.expunge(db_task) + return db_task + + def _get_task_record(self, task_obj: "AbstractTask") -> DBTask: + """ + Retrieves the `DBTask` for the task identified by `task_obj`. + + This class should not be mutated, only the fields queried. + """ + return self.get_task_record(task_obj.token) def _mutate_task_record(self, task_obj: "AbstractTask", mutator: Callable[[DBTask], None]): @@ -128,14 +200,14 @@ def _mutate_task_record(self, task_obj: "AbstractTask", corresponding to the `task_obj` description available in memory. """ with DBSession(self._database_factory) as session: - try: - db_record = session.query(DBTask).get(task_obj.token) - except sqlalchemy.exc.SQLAlchemyError as sql_err: + db_task: Optional[DBTask] = \ + session.query(DBTask).get(task_obj.token) + if not db_task: raise KeyError(f"No task record for token '{task_obj.token}' " - "in the database") from sql_err + "in the database") try: - mutator(db_record) + mutator(db_task) except Exception: session.rollback() @@ -195,35 +267,18 @@ def should_cancel(self, task_obj: "AbstractTask") -> bool: (db_task.status in ["enqueued", "running"] and db_task.cancel_flag) + def add_comment(self, task_obj: "AbstractTask", comment: str, + actor: Optional[str] = None): + """ + Adds `comment` in the name of `actor` to the task record corresponding + to `task_obj`. + """ + self._mutate_task_record(task_obj, + lambda dbt: dbt.add_comment(comment, actor)) + def heartbeat(self, task_obj: "AbstractTask"): """ Triggers ``heartbeat()`` timestamp update in the database for `task_obj`. """ self._mutate_task_record(task_obj, lambda dbt: dbt.heartbeat()) - - -def drop_all_incomplete_tasks(config_db_session_factory, machine_id: str, - action: str) -> int: - """ - Sets all tasks in the database (reachable via `config_db_session_factory`) - that were associated with the given `machine_id` to ``"dropped"`` status, - indicating that the status was changed during the `action`. - - Returns the number of `DBTask`s actually changed. - """ - count: int = 0 - with DBSession(config_db_session_factory) as session: - for t in session.query(DBTask) \ - .filter(DBTask.machine_id == machine_id, - DBTask.status.in_(["allocated", - "enqueued", - "running"])) \ - .all(): - count += 1 - t.add_comment(f"DROPPED!\n{action}", - "SYSTEM") - t.set_abandoned(force_dropped_status=True) - - session.commit() - return count diff --git a/web/server/vue-cli/package-lock.json b/web/server/vue-cli/package-lock.json index d0943fd772..07b56dce13 100644 --- a/web/server/vue-cli/package-lock.json +++ b/web/server/vue-cli/package-lock.json @@ -5115,7 +5115,7 @@ "node_modules/codechecker-api": { "version": "6.59.0", "resolved": "file:../../api/js/codechecker-api-node/dist/codechecker-api-6.59.0.tgz", - "integrity": "sha512-BZCBDRjVFS5UerrXsoPNioQppTfrCdDgToHqfFfaQtk6FPVrER42LchfU+cZl254PgWh58H5bLfqdLyFfqntCg==", + "integrity": "sha512-DN1vQkV3P/5jwI62Sd+JzvNALe/i7km2iDd8GKfK6vQYdYPnHg3ZpwK1vyRcF0dsegZhjfgoMzOAclH+nwk+Yg==", "license": "SEE LICENSE IN LICENSE", "dependencies": { "thrift": "0.13.0-hotfix.1" @@ -21146,7 +21146,7 @@ }, "codechecker-api": { "version": "file:../../api/js/codechecker-api-node/dist/codechecker-api-6.59.0.tgz", - "integrity": "sha512-BZCBDRjVFS5UerrXsoPNioQppTfrCdDgToHqfFfaQtk6FPVrER42LchfU+cZl254PgWh58H5bLfqdLyFfqntCg==", + "integrity": "sha512-DN1vQkV3P/5jwI62Sd+JzvNALe/i7km2iDd8GKfK6vQYdYPnHg3ZpwK1vyRcF0dsegZhjfgoMzOAclH+nwk+Yg==", "requires": { "thrift": "0.13.0-hotfix.1" }