diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 00e8ffca07..290d56bb5c 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -20,8 +20,8 @@ jobs: - name: Install dependencies run: | pip install $(grep -iE "pylint|pycodestyle" analyzer/requirements_py/dev/requirements.txt) - - name: Run tests - run: make pylint pycodestyle + - name: Run pycodestyle & pylint + run: make -k pycodestyle pylint tools: name: Tools (report-converter, etc.) diff --git a/analyzer/codechecker_analyzer/analysis_manager.py b/analyzer/codechecker_analyzer/analysis_manager.py index 6b22ca4231..6f7e8a22dd 100644 --- a/analyzer/codechecker_analyzer/analysis_manager.py +++ b/analyzer/codechecker_analyzer/analysis_manager.py @@ -13,16 +13,15 @@ import shutil import signal import sys -import time import traceback import zipfile from threading import Timer import multiprocess -import psutil from codechecker_common.logger import get_logger +from codechecker_common.process import kill_process_tree from codechecker_common.review_status_handler import ReviewStatusHandler from codechecker_statistics_collector.collectors.special_return_value import \ @@ -341,42 +340,6 @@ def handle_failure( os.remove(plist_file) -def kill_process_tree(parent_pid, recursive=False): - """Stop the process tree try it gracefully first. - - Try to stop the parent and child processes gracefuly - first if they do not stop in time send a kill signal - to every member of the process tree. - - There is a similar function in the web part please - consider to update that in case of changing this. - """ - proc = psutil.Process(parent_pid) - children = proc.children(recursive) - - # Send a SIGTERM (Ctrl-C) to the main process - proc.terminate() - - # If children processes don't stop gracefully in time, - # slaughter them by force. - _, still_alive = psutil.wait_procs(children, timeout=5) - for p in still_alive: - p.kill() - - # Wait until this process is running. - n = 0 - timeout = 10 - while proc.is_running(): - if n > timeout: - LOG.warning("Waiting for process %s to stop has been timed out" - "(timeout = %s)! Process is still running!", - parent_pid, timeout) - break - - time.sleep(1) - n += 1 - - def setup_process_timeout(proc, timeout, failure_callback=None): """ diff --git a/bin/CodeChecker b/bin/CodeChecker index ad820b8a05..261e2312b2 100755 --- a/bin/CodeChecker +++ b/bin/CodeChecker @@ -6,10 +6,10 @@ # SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception # # ------------------------------------------------------------------------- - """ Used to kickstart CodeChecker. -Save original environment without modifications. + +Saves original environment without modifications. Used to run the logging in the same env. """ # This is for enabling CodeChecker as a filename (i.e. module name). @@ -25,9 +25,10 @@ import sys import tempfile PROC_PID = None +EXIT_CODE = None -def run_codechecker(checker_env, subcommand=None): +def run_codechecker(checker_env, subcommand=None) -> int: """ Run the CodeChecker. * checker_env - CodeChecker will be run in the checker env. @@ -63,11 +64,13 @@ def run_codechecker(checker_env, subcommand=None): global PROC_PID PROC_PID = proc.pid - proc.wait() - sys.exit(proc.returncode) + global EXIT_CODE + EXIT_CODE = proc.wait() + + return EXIT_CODE -def main(subcommand=None): +def main(subcommand=None) -> int: original_env = os.environ.copy() checker_env = original_env @@ -94,30 +97,32 @@ def main(subcommand=None): print('Saving original build environment failed.') print(ex) - def signal_term_handler(signum, _frame): + def signal_handler(signum, _frame): + """ + Forwards the received signal to the CodeChecker subprocess started by + this `main` script. + """ global PROC_PID if PROC_PID and sys.platform != "win32": - os.kill(PROC_PID, signal.SIGINT) - - _remove_tmp() - sys.exit(128 + signum) - - signal.signal(signal.SIGTERM, signal_term_handler) - signal.signal(signal.SIGINT, signal_term_handler) - - def signal_reload_handler(_sig, _frame): - global PROC_PID - if PROC_PID: - os.kill(PROC_PID, signal.SIGHUP) + try: + os.kill(PROC_PID, signum) + except ProcessLookupError: + pass + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) if sys.platform != "win32": - signal.signal(signal.SIGHUP, signal_reload_handler) + signal.signal(signal.SIGHUP, signal_handler) + signal.signal(signal.SIGCHLD, signal_handler) try: - run_codechecker(checker_env, subcommand) + global EXIT_CODE + EXIT_CODE = run_codechecker(checker_env, subcommand) finally: _remove_tmp() + return EXIT_CODE + if __name__ == "__main__": - main(None) + sys.exit(main(None) or 0) diff --git a/codechecker_common/compatibility/multiprocessing.py b/codechecker_common/compatibility/multiprocessing.py index 14ef7ebebe..eaee9a78e7 100644 --- a/codechecker_common/compatibility/multiprocessing.py +++ b/codechecker_common/compatibility/multiprocessing.py @@ -13,8 +13,15 @@ # pylint: disable=no-name-in-module # pylint: disable=unused-import if sys.platform in ["darwin", "win32"]: - from multiprocess import Pool # type: ignore - from multiprocess import cpu_count + from multiprocess import \ + Pool, Process, \ + Queue, \ + Value, \ + cpu_count else: - from concurrent.futures import ProcessPoolExecutor as Pool # type: ignore - from multiprocessing import cpu_count + from concurrent.futures import ProcessPoolExecutor as Pool + from multiprocessing import \ + Process, \ + Queue, \ + Value, \ + cpu_count diff --git a/codechecker_common/logger.py b/codechecker_common/logger.py index 8c860dee6e..35702fb0b8 100644 --- a/codechecker_common/logger.py +++ b/codechecker_common/logger.py @@ -6,16 +6,18 @@ # # ------------------------------------------------------------------------- - import argparse +import datetime import json import logging from logging import config from pathlib import Path import os +import sys +from typing import Optional -# The logging leaves can be accesses without -# importing the logging module in other modules. +# The logging leaves can be accesses without importing the logging module in +# other modules. DEBUG = logging.DEBUG INFO = logging.INFO WARNING = logging.WARNING @@ -25,14 +27,24 @@ CMDLINE_LOG_LEVELS = ['info', 'debug_analyzer', 'debug'] -DEBUG_ANALYZER = logging.DEBUG_ANALYZER = 15 # type: ignore +DEBUG_ANALYZER = 15 logging.addLevelName(DEBUG_ANALYZER, 'DEBUG_ANALYZER') +_Levels = {"DEBUG": DEBUG, + "DEBUG_ANALYZER": DEBUG_ANALYZER, + "INFO": INFO, + "WARNING": WARNING, + "ERROR": ERROR, + "CRITICAL": CRITICAL, + "NOTSET": NOTSET, + } + + class CCLogger(logging.Logger): def debug_analyzer(self, msg, *args, **kwargs): - if self.isEnabledFor(logging.DEBUG_ANALYZER): - self._log(logging.DEBUG_ANALYZER, msg, args, **kwargs) + if self.isEnabledFor(DEBUG_ANALYZER): + self._log(DEBUG_ANALYZER, msg, args, **kwargs) logging.setLoggerClass(CCLogger) @@ -113,6 +125,36 @@ def validate_loglvl(log_level): return log_level +def raw_sprint_log(logger: logging.Logger, level: str, message: str) \ + -> Optional[str]: + """ + Formats a raw log `message` using the date format of the specified + `logger`, without actually invoking the logging infrastructure. + """ + if not logger.isEnabledFor(_Levels[level]): + return None + + formatter = logger.handlers[0].formatter if len(logger.handlers) > 0 \ + else None + datefmt = formatter.datefmt if formatter else None + time = datetime.datetime.now().strftime(datefmt) if datefmt \ + else str(datetime.datetime.now()) + + return f"[{validate_loglvl(level)} {time}] - {message}" + + +def signal_log(logger: logging.Logger, level: str, message: str): + """ + Simulates a log output and logs a message within a signal handler, without + triggering a `RuntimeError` due to reentrancy in `print`-like method calls. + """ + formatted = raw_sprint_log(logger, level, message) + if not formatted: + return + + os.write(sys.stderr.fileno(), f"{formatted}\n".encode()) + + class LogCfgServer: """ Initialize a log configuration server for dynamic log configuration. diff --git a/codechecker_common/process.py b/codechecker_common/process.py new file mode 100644 index 0000000000..86da476d2a --- /dev/null +++ b/codechecker_common/process.py @@ -0,0 +1,49 @@ +# ------------------------------------------------------------------------- +# +# Part of the CodeChecker project, under the Apache License v2.0 with +# LLVM Exceptions. See LICENSE for license information. +# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +# +# ------------------------------------------------------------------------- +import time + +import psutil + +from .logger import get_logger + + +LOG = get_logger("system") + + +def kill_process_tree(parent_pid, recursive=False): + """ + Stop the process tree, gracefully at first. + + Try to stop the parent and child processes gracefuly first. + If they do not stop in time, send a kill signal to every member of the + process tree. + """ + proc = psutil.Process(parent_pid) + children = proc.children(recursive) + + # Send a SIGTERM to the main process. + proc.terminate() + + # If children processes don't stop gracefully in time, slaughter them + # by force. + _, still_alive = psutil.wait_procs(children, timeout=5) + for p in still_alive: + p.kill() + + # Wait until this process is running. + n = 0 + timeout = 10 + while proc.is_running(): + if n > timeout: + LOG.warning("Waiting for process %s to stop has been timed out" + "(timeout = %s)! Process is still running!", + parent_pid, timeout) + break + + time.sleep(1) + n += 1 diff --git a/codechecker_common/util.py b/codechecker_common/util.py index e389b8d1a0..c9075d5599 100644 --- a/codechecker_common/util.py +++ b/codechecker_common/util.py @@ -8,9 +8,12 @@ """ Util module. """ +import datetime +import hashlib import itertools import json import os +import random from typing import TextIO import portalocker @@ -112,3 +115,19 @@ def path_for_fake_root(full_path: str, root_path: str = '/') -> str: def strtobool(value: str) -> bool: """Parse a string value to a boolean.""" return value.lower() in ('y', 'yes', 't', 'true', 'on', '1') + + +def generate_random_token(num_bytes: int = 32) -> str: + """ + Returns a random-generated string usable as a token with `num_bytes` + hexadecimal characters in the output. + """ + prefix = str(os.getpid()).encode() + suffix = str(datetime.datetime.now()).encode() + + hash_value = ''.join( + [hashlib.sha256(prefix + os.urandom(num_bytes * 2) + suffix) + .hexdigest() + for _ in range(0, -(num_bytes // -64))]) + idx = random.randrange(0, len(hash_value) - num_bytes + 1) + return hash_value[idx:(idx + num_bytes)] diff --git a/docs/web/server_config.md b/docs/web/server_config.md index add9bddcb7..7eb0e4f468 100644 --- a/docs/web/server_config.md +++ b/docs/web/server_config.md @@ -17,7 +17,7 @@ Table of Contents * [Size of the compilation database](#size-of-the-compilation-database) * [Authentication](#authentication) -## Number of worker processes +## Number of API worker processes The `worker_processes` section of the config file controls how many processes will be started on the server to process API requests. @@ -25,6 +25,14 @@ will be started on the server to process API requests. The server needs to be restarted if the value is changed in the config file. +### Number of task worker processes +The `background_worker_processes` section of the config file controls how many +processes will be started on the server to process background jobs. + +*Default value*: Fallback to same amount as `worker_processes`. + +The server needs to be restarted if the value is changed in the config file. + ## Run limitation The `max_run_count` section of the config file controls how many runs can be stored on the server for a product. diff --git a/docs/web/user_guide.md b/docs/web/user_guide.md index 846599b76a..bca75ee459 100644 --- a/docs/web/user_guide.md +++ b/docs/web/user_guide.md @@ -145,8 +145,9 @@ or via the `CodeChecker cmd` command-line client. ``` usage: CodeChecker server [-h] [-w WORKSPACE] [-f CONFIG_DIRECTORY] - [--host LISTEN_ADDRESS] [-v PORT] [--not-host-only] - [--skip-db-cleanup] [--config CONFIG_FILE] + [--machine-id MACHINE_ID] [--host LISTEN_ADDRESS] + [-v PORT] [--not-host-only] [--skip-db-cleanup] + [--config CONFIG_FILE] [--sqlite SQLITE_FILE | --postgresql] [--dbaddress DBADDRESS] [--dbport DBPORT] [--dbusername DBUSERNAME] [--dbname DBNAME] @@ -172,6 +173,20 @@ optional arguments: specific configuration (such as authentication settings, and TLS/SSL certificates) from. (default: /home//.codechecker) + --machine-id MACHINE_ID + A unique identifier to be used to identify the machine + running subsequent instances of the "same" server + process. This value is only used internally to + maintain normal function and bookkeeping of executed + tasks following an unclean server shutdown, e.g., + after a crash or system-level interference. If + unspecified, defaults to a reasonable default value + that is generated from the computer's hostname, as + reported by the operating system. In most scenarios, + there is no need to fine-tune this, except if + subsequent executions of the "same" server is achieved + in distinct environments, e.g., if the server + otherwise is running in a container. --host LISTEN_ADDRESS The IP address or hostname of the server on which it should listen for connections. For IPv6 listening, diff --git a/web/api/Makefile b/web/api/Makefile index e145755563..d322ab77b0 100644 --- a/web/api/Makefile +++ b/web/api/Makefile @@ -37,10 +37,11 @@ build: clean target_dirs thrift:$(THRIFT_VERSION) \ /bin/bash -c " \ thrift $(THRIFT_OPTS) $(TARGET_PY) $(TARGET_JS) /data/authentication.thrift && \ - thrift $(THRIFT_OPTS) $(TARGET_PY) $(TARGET_JS) /data/products.thrift && \ - thrift $(THRIFT_OPTS) $(TARGET_PY) $(TARGET_JS) /data/report_server.thrift && \ - thrift $(THRIFT_OPTS) $(TARGET_PY) $(TARGET_JS) /data/configuration.thrift && \ - thrift $(THRIFT_OPTS) $(TARGET_PY) $(TARGET_JS) /data/server_info.thrift" + thrift $(THRIFT_OPTS) $(TARGET_PY) $(TARGET_JS) /data/configuration.thrift && \ + thrift $(THRIFT_OPTS) $(TARGET_PY) $(TARGET_JS) /data/products.thrift && \ + thrift $(THRIFT_OPTS) $(TARGET_PY) $(TARGET_JS) /data/report_server.thrift && \ + thrift $(THRIFT_OPTS) $(TARGET_PY) $(TARGET_JS) /data/server_info.thrift && \ + thrift $(THRIFT_OPTS) $(TARGET_PY) $(TARGET_JS) /data/tasks.thrift" # Create tarball from the API JavaScript part which will be commited in the # repository and installed as a dependency. diff --git a/web/api/codechecker_api_shared.thrift b/web/api/codechecker_api_shared.thrift index 167f8ab40b..3e01ea5fbd 100644 --- a/web/api/codechecker_api_shared.thrift +++ b/web/api/codechecker_api_shared.thrift @@ -4,15 +4,24 @@ // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception // ------------------------------------------------------------------------- +/** + * Helper enum for expressing a three-way boolean in a filter. + */ +enum Ternary { + BOTH = 0, // Indicates a query where both set and unset booleans are matched. + OFF = 1, // Indicates a query where the filter matches an UNSET boolean. + ON = 2, // Indicates a query where the filter matches a SET boolean. +} + enum ErrorCode { DATABASE, IOERROR, GENERAL, - AUTH_DENIED, // Authentication denied. We do not allow access to the service. - UNAUTHORIZED, // Authorization denied. User does not have right to perform an action. - API_MISMATCH, // The client attempted to query an API version that is not supported by the server. - SOURCE_FILE, // The client sent a source code which contains errors (e.g.: source code comment errors). - REPORT_FORMAT, // The client sent a report with wrong format (e.g. report annotation has bad type in a .plist) + AUTH_DENIED, // Authentication denied. We do not allow access to the service. + UNAUTHORIZED, // Authorization denied. User does not have right to perform an action. + API_MISMATCH, // The client attempted to query an API version that is not supported by the server. + SOURCE_FILE, // The client sent a source code which contains errors (e.g., source code comment errors). + REPORT_FORMAT, // The client sent a report with wrong format (e.g., report annotation has bad type in a .plist). } exception RequestFailed { @@ -30,7 +39,7 @@ exception RequestFailed { * PRODUCT: These permissions are configured per-product. * The extra data field looks like the following object: * { i64 productID } -*/ + */ enum Permission { SUPERUSER = 1, // scope: SYSTEM PERMISSION_VIEW = 2, // scope: SYSTEM @@ -42,8 +51,8 @@ enum Permission { } /** -* Status information about the database backend. -*/ + * Status information about the database backend. + */ enum DBStatus { OK, // Everything is ok with the database. MISSING, // The database is missing. @@ -54,3 +63,9 @@ enum DBStatus { SCHEMA_INIT_ERROR, // Failed to create initial database schema. SCHEMA_UPGRADE_FAILED // Failed to upgrade schema. } + +/** + * Common token type identifying a background task. + * (Main implementation for task management API is in tasks.thrift.) + */ +typedef string TaskToken; diff --git a/web/api/js/codechecker-api-node/dist/codechecker-api-6.58.0.tgz b/web/api/js/codechecker-api-node/dist/codechecker-api-6.58.0.tgz deleted file mode 100644 index a8cf8ab10b..0000000000 Binary files a/web/api/js/codechecker-api-node/dist/codechecker-api-6.58.0.tgz and /dev/null differ diff --git a/web/api/js/codechecker-api-node/dist/codechecker-api-6.59.0.tgz b/web/api/js/codechecker-api-node/dist/codechecker-api-6.59.0.tgz new file mode 100644 index 0000000000..270aaf28fc Binary files /dev/null and b/web/api/js/codechecker-api-node/dist/codechecker-api-6.59.0.tgz differ diff --git a/web/api/js/codechecker-api-node/package.json b/web/api/js/codechecker-api-node/package.json index 0bfd792add..86e4a596e9 100644 --- a/web/api/js/codechecker-api-node/package.json +++ b/web/api/js/codechecker-api-node/package.json @@ -1,6 +1,6 @@ { "name": "codechecker-api", - "version": "6.58.0", + "version": "6.59.0", "description": "Generated node.js compatible API stubs for CodeChecker server.", "main": "lib", "homepage": "https://github.com/Ericsson/codechecker", diff --git a/web/api/py/codechecker_api/dist/codechecker_api.tar.gz b/web/api/py/codechecker_api/dist/codechecker_api.tar.gz index 3875d3ef7f..601a9c2e5c 100644 Binary files a/web/api/py/codechecker_api/dist/codechecker_api.tar.gz and b/web/api/py/codechecker_api/dist/codechecker_api.tar.gz differ diff --git a/web/api/py/codechecker_api/setup.py b/web/api/py/codechecker_api/setup.py index b369453448..fc9d400def 100644 --- a/web/api/py/codechecker_api/setup.py +++ b/web/api/py/codechecker_api/setup.py @@ -8,7 +8,7 @@ with open('README.md', encoding='utf-8', errors="ignore") as f: long_description = f.read() -api_version = '6.58.0' +api_version = '6.59.0' setup( name='codechecker_api', diff --git a/web/api/py/codechecker_api_shared/dist/codechecker_api_shared.tar.gz b/web/api/py/codechecker_api_shared/dist/codechecker_api_shared.tar.gz index 4d607e2b2f..1cbe790554 100644 Binary files a/web/api/py/codechecker_api_shared/dist/codechecker_api_shared.tar.gz and b/web/api/py/codechecker_api_shared/dist/codechecker_api_shared.tar.gz differ diff --git a/web/api/py/codechecker_api_shared/setup.py b/web/api/py/codechecker_api_shared/setup.py index a4c2e70d02..90f09bf34e 100644 --- a/web/api/py/codechecker_api_shared/setup.py +++ b/web/api/py/codechecker_api_shared/setup.py @@ -8,7 +8,7 @@ with open('README.md', encoding='utf-8', errors="ignore") as f: long_description = f.read() -api_version = '6.58.0' +api_version = '6.59.0' setup( name='codechecker_api_shared', diff --git a/web/api/report_server.thrift b/web/api/report_server.thrift index 359372e28a..962a1c49d8 100644 --- a/web/api/report_server.thrift +++ b/web/api/report_server.thrift @@ -201,6 +201,17 @@ struct RunData { } typedef list RunDataList +struct SubmittedRunOptions { + 1: string runName, + 2: string tag, + 3: string version, // The version of CodeChecker with + // which the analysis was done. + 4: bool force, // If set, existing results in + // the run are removed first. + 5: list trimPathPrefixes, + 6: optional string description, +} + struct RunHistoryData { 1: i64 runId, // Unique id of the run. 2: string runName, // Name of the run. @@ -208,8 +219,7 @@ struct RunHistoryData { 4: string user, // User name who analysed the run. 5: string time, // Date time when the run was analysed. 6: i64 id, // Id of the run history tag. - // !!!DEPRECATED!!! This field will be empty so use the getCheckCommand() API function to get the check command for a run. - 7: string checkCommand, + 7: string checkCommand, // Check command. !!!DEPRECATED!!! This field will be empty so use the getCheckCommand API function to get the check command for a run. 8: string codeCheckerVersion, // CodeChecker client version of the latest analysis. 9: AnalyzerStatisticsData analyzerStatistics, // Statistics for analyzers. Only number of failed and successfully analyzed // files field will be set. To get full analyzer statistics please use the @@ -943,32 +953,47 @@ service codeCheckerDBAccess { //============================================ // The client can ask the server whether a file is already stored in the - // database. If it is, then it is not necessary to send it in the ZIP file - // with massStoreRun() function. This function requires a list of file hashes - // (sha256) and returns the ones which are not stored yet. + // database. + // If it is present, then it is not necessary to send the file in the ZIP + // to the massStoreRunAsynchronous() function. + // This function requires a list of file hashes (sha256) and returns the + // ones which are not stored yet. + // // PERMISSION: PRODUCT_STORE list getMissingContentHashes(1: list fileHashes) throws (1: codechecker_api_shared.RequestFailed requestError), // The client can ask the server whether a blame info is already stored in the - // database. If it is, then it is not necessary to send it in the ZIP file - // with massStoreRun() function. This function requires a list of file hashes - // (sha256) and returns the ones to which no blame info is stored yet. + // database. + // If it is, then it is not necessary to send the info in the ZIP file + // to the massStoreRunAsynchronous() function. + // This function requires a list of file hashes (sha256) and returns the + // ones to which no blame info is stored yet. + // // PERMISSION: PRODUCT_STORE list getMissingContentHashesForBlameInfo(1: list fileHashes) throws (1: codechecker_api_shared.RequestFailed requestError), // This function stores an entire run encapsulated and sent in a ZIP file. - // The ZIP file has to be compressed and sent as a base64 encoded string. The - // ZIP file must contain a "reports" and an optional "root" sub-folder. - // The former one is the output of 'CodeChecker analyze' command and the - // latter one contains the source files on absolute paths starting as if - // "root" was the "/" directory. The source files are not necessary to be - // wrapped in the ZIP file (see getMissingContentHashes() function). + // The ZIP file has to be compressed by ZLib and the compressed buffer + // sent as a Base64-encoded string. The ZIP file must contain a "reports" and + // an optional "root" sub-directory. The former one is the output of the + // 'CodeChecker analyze' command and the latter one contains the source files + // on absolute paths starting as if "root" was the "/" directory. The source + // files are not necessary to be wrapped in the ZIP file + // (see getMissingContentHashes() function). // // The "version" parameter is the used CodeChecker version which checked this // run. // The "force" parameter removes existing analysis results for a run. + // + // !DEPRECATED!: Use of this function is deprecated as the storing client + // process is prone to infinite hangs while waiting for the return value of + // the Thrift call if the network communication terminates during the time + // the server is processing the sent data, which might take a very long time. + // Appropriately modern clients are expected to use the + // massStoreRunAsynchronous() function and the Task API instead! + // // PERMISSION: PRODUCT_STORE i64 massStoreRun(1: string runName, 2: string tag, @@ -979,6 +1004,35 @@ service codeCheckerDBAccess { 7: optional string description) throws (1: codechecker_api_shared.RequestFailed requestError), + // This function stores an entire analysis run encapsulated and sent as a + // ZIP file. The ZIP file must be compressed by ZLib and sent as a + // Base64-encoded string. It must contain a "reports" and an optional "root" + // sub-directory. "reports" contains the output of the `CodeChecker analyze` + // command, while "root", if present, contains the source code of the project + // with their full paths, with the logical "root" replacing the original + // "/" directory. + // + // The source files are not necessary to be present in the ZIP, see + // getMissingContentHashes() for details. + // + // After performing an initial validation of the well-formedness of the + // submitted structure (ill-formedness is reported as an exception), the + // potentially lengthy processing of the data and the database operations are + // done asynchronously. + // + // This function returns a TaskToken, which SHOULD be used as the argument to + // the tasks::getTaskInfo() function such that clients retrieve the + // processing's state. Clients MAY decide to "detach", i.e., not to wait + // for the processing of the submitted data, and ignore the returned handle. + // Even if the client detached, the processing of the stored reports will + // likely eventually conclude. + // + // PERMISSION: PRODUCT_STORE + codechecker_api_shared.TaskToken massStoreRunAsynchronous( + 1: string zipfileBlob, // Base64-encoded string. + 2: SubmittedRunOptions storeOpts) + throws (1: codechecker_api_shared.RequestFailed requestError), + // Returns true if analysis statistics information can be sent to the server, // otherwise it returns false. // PERMISSION: PRODUCT_STORE diff --git a/web/api/tasks.thrift b/web/api/tasks.thrift new file mode 100644 index 0000000000..e380d59ac3 --- /dev/null +++ b/web/api/tasks.thrift @@ -0,0 +1,162 @@ +// ------------------------------------------------------------------------- +// Part of the CodeChecker project, under the Apache License v2.0 with +// LLVM Exceptions. See LICENSE for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// ------------------------------------------------------------------------- + +include "codechecker_api_shared.thrift" + +namespace py codeCheckerServersideTasks_v6 +namespace js codeCheckerServersideTasks_v6 + +enum TaskStatus { + ALLOCATED, // Non-terminated state. Token registered but the job hasn't queued yet: the input is still processing. + ENQUEUED, // Non-terminated state. Job in the queue, and all inputs are meaningfully available. + RUNNING, // Non-terminated state. + COMPLETED, // Terminated state. Successfully ran to completion. + FAILED, // Terminated state. Job was running, but the execution failed. + CANCELLED, // Terminated state. Job was cancelled by an administrator, and the cancellation succeeded. + DROPPED, // Terminated state. Job was cancelled due to system reasons (server shutdown, crash, other interference). +} + +struct TaskInfo { + 1: codechecker_api_shared.TaskToken token, + 2: string taskKind, + 3: TaskStatus status, + // If the task is associated with a product, this ID can be used to query + // product information, see products.thirft service. + // The 'productID' is set to 0 if there is no product associated, meaning + // that the task is "global to the server". + 4: i64 productId, + 5: string actorUsername, + 6: string summary, + // Additional, human-readable comments, history, and log output from the + // tasks's processing. + 7: string comments, + 8: i64 enqueuedAtEpoch, + 9: i64 startedAtEpoch, + 10: i64 completedAtEpoch, + 11: i64 lastHeartbeatEpoch, + // Whether the administrator set this job for a co-operative cancellation. + 12: bool cancelFlagSet, +} + +/** + * TaskInfo with additional fields that is sent to administrators only. + */ +struct AdministratorTaskInfo { + 1: TaskInfo normalInfo, + 2: string machineId, // The hopefully unique identifier of the server + // that is/was processing the task. + 3: bool statusConsumed, // Whether the main actor of the task + // (see normalInfo.actorUsername) consumed the + // termination status of the job. +} + +/** + * Metastructure that holds the filters for getTasks(). + * The individual fields of the struct are in "AND" relation with each other. + * For list<> fields, elements of the list filter the same "column" of the + * task information table, and are considered in an "OR" relation. + */ +struct TaskFilter { + 1: list tokens, + 2: list machineIDs, + 3: list kinds, + 4: list statuses, + // If empty, it means "all", including those of no username. + 5: list usernames, + // If True, it means filter for **only** "no username". + // Can not be set together with a non-empty "usernames". + 6: bool filterForNoUsername, + // If empty, it means "all", including those of no product ID. + 7: list productIDs, + // If True, it means filter for **only** "no product ID". + // Can not be set together with a non-empty "productIDs". + 8: bool filterForNoProductID, + 9: i64 enqueuedBeforeEpoch, + 10: i64 enqueuedAfterEpoch, + 11: i64 startedBeforeEpoch, + 12: i64 startedAfterEpoch, + 13: i64 completedBeforeEpoch, + 14: i64 completedAfterEpoch, + 15: i64 heartbeatBeforeEpoch, + 16: i64 heartbeatAfterEpoch, + 17: codechecker_api_shared.Ternary cancelFlag, + 18: codechecker_api_shared.Ternary consumedFlag, +} + +service codeCheckerServersideTaskService { + // Retrieves the status of a task registered on the server, based on its + // identifying "token". + // + // Following this query, if the task is in any terminating states and the + // query was requested by the main actor, the status will be considered + // "consumed", and might be garbage collected by the server at a later + // point in time. + // + // If the server has authentication enabled, this query is only allowed to + // the following users: + // * The user who originally submitted the request that resulted in the + // creation of this job. + // * If the job is associated with a specific product, anyone with + // PRODUCT_ADMIN privileges for that product. + // * Users with SUPERUSER rights. + // + // PERMISSION: . + TaskInfo getTaskInfo( + 1: codechecker_api_shared.TaskToken token) + throws (1: codechecker_api_shared.RequestFailed requestError), + + // Returns privileged information about the tasks stored in the servers' + // databases, based on the given filter. + // + // This query does not set the "consumed" flag on the results, even if the + // querying user was a task's main actor. + // + // If the querying user only has PRODUCT_ADMIN rights, they are only allowed + // to query the tasks corresponding to a product they are PRODUCT_ADMIN of. + // + // PERMISSION: SUPERUSER, PRODUCT_ADMIN + list getTasks( + 1: TaskFilter filters) + throws (1: codechecker_api_shared.RequestFailed requestError), + + // Sets the specified task's "cancel" flag to TRUE, resulting in a request to + // the task's execution to co-operatively terminate itself. + // Returns whether the current RPC call was the one which set the flag. + // + // Tasks will generally terminate themselves at a safe point during their + // processing, but there are no guarantees that a specific task at any given + // point can reach such a safe point. + // There are no guarantees that a specific task is implemented in a way that + // it can ever be terminated via a "cancel" action. + // + // This method does not result in a communication via operating system + // primitives to the running server, and it is not capable of either + // completely shutting down a running server, or, conversely, to resurrect a + // hung server. + // + // Setting the "cancel" flag of an already cancelled task does nothing, and + // it is not possible to un-cancel a task. + // Setting the "cancel" flag of already terminated tasks does nothing. + // In both such cases, the RPC call will return "bool False". + // + // PERMISSION: SUPERUSER + bool cancelTask( + 1: codechecker_api_shared.TaskToken token) + throws (1: codechecker_api_shared.RequestFailed requestError), + + // Used for testing purposes only. + // This function will **ALWAYS** throw an exception when ran outside of a + // testing environment. + // + // The dummy task will increment a temporary counter in the background, with + // intermittent sleeping, up to approximately "timeout" number of seconds, + // after which point it will gracefully terminate. + // The result of the execution is unsuccessful if "shouldFail" is a true. + codechecker_api_shared.TaskToken createDummyTask( + 1: i32 timeout, + 2: bool shouldFail) + throws (1: codechecker_api_shared.RequestFailed requestError), +} diff --git a/web/client/codechecker_client/client.py b/web/client/codechecker_client/client.py index 730a83446b..0bbe2f69fe 100644 --- a/web/client/codechecker_client/client.py +++ b/web/client/codechecker_client/client.py @@ -205,7 +205,7 @@ def setup_product_client(protocol, host, port, auth_client=None, # Attach to the server-wide product service. product_client = ThriftProductHelper( protocol, host, port, - '/v' + CLIENT_API + '/Products', + f"/v{CLIENT_API}/Products", session_token, lambda: get_new_token(protocol, host, port, cred_manager)) else: @@ -260,6 +260,6 @@ def setup_client(product_url) -> ThriftResultsHelper: return ThriftResultsHelper( protocol, host, port, - '/' + product_name + '/v' + CLIENT_API + '/CodeCheckerService', + f"/{product_name}/v{CLIENT_API}/CodeCheckerService", session_token, lambda: get_new_token(protocol, host, port, cred_manager)) diff --git a/web/client/codechecker_client/helpers/results.py b/web/client/codechecker_client/helpers/results.py index c558cfe040..399623019e 100644 --- a/web/client/codechecker_client/helpers/results.py +++ b/web/client/codechecker_client/helpers/results.py @@ -9,7 +9,7 @@ Helper functions for Thrift api calls. """ -from codechecker_api.codeCheckerDBAccess_v6 import codeCheckerDBAccess +from codechecker_api.codeCheckerDBAccess_v6 import codeCheckerDBAccess, ttypes from codechecker_client.thrift_call import thrift_client_call from .base import BaseClientHelper @@ -181,6 +181,12 @@ def massStoreRun(self, name, tag, version, zipdir, force, trim_path_prefixes, description): pass + @thrift_client_call + def massStoreRunAsynchronous(self, zipfile_blob: str, + store_opts: ttypes.SubmittedRunOptions) \ + -> str: + pass + @thrift_client_call def allowsStoringAnalysisStatistics(self): pass diff --git a/web/codechecker_web/shared/version.py b/web/codechecker_web/shared/version.py index e5d544a750..2ac2d84ae7 100644 --- a/web/codechecker_web/shared/version.py +++ b/web/codechecker_web/shared/version.py @@ -18,7 +18,7 @@ # The newest supported minor version (value) for each supported major version # (key) in this particular build. SUPPORTED_VERSIONS = { - 6: 58 + 6: 59 } # Used by the client to automatically identify the latest major and minor diff --git a/web/server/codechecker_server/api/authentication.py b/web/server/codechecker_server/api/authentication.py index 1430ad9fd6..9e73923e45 100644 --- a/web/server/codechecker_server/api/authentication.py +++ b/web/server/codechecker_server/api/authentication.py @@ -19,6 +19,7 @@ AuthorisationList, HandshakeInformation, Permissions, SessionTokenData from codechecker_common.logger import get_logger +from codechecker_common.util import generate_random_token from codechecker_server.profiler import timeit @@ -28,7 +29,6 @@ from ..permissions import handler_from_scope_params as make_handler, \ require_manager, require_permission from ..server import permissions -from ..session_manager import generate_session_token LOG = get_logger('server') @@ -363,7 +363,7 @@ def newToken(self, description): """ self.__require_privilaged_access() with DBSession(self.__config_db) as session: - token = generate_session_token() + token = generate_random_token(32) user = self.getLoggedInUser() groups = ';'.join(self.__auth_session.groups) session_token = Session(token, user, groups, description, False) diff --git a/web/server/codechecker_server/api/common.py b/web/server/codechecker_server/api/common.py new file mode 100644 index 0000000000..2fc699a24f --- /dev/null +++ b/web/server/codechecker_server/api/common.py @@ -0,0 +1,49 @@ +# ------------------------------------------------------------------------- +# +# Part of the CodeChecker project, under the Apache License v2.0 with +# LLVM Exceptions. See LICENSE for license information. +# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +# +# ------------------------------------------------------------------------- +import sqlalchemy + +from codechecker_api_shared.ttypes import RequestFailed, ErrorCode + +from codechecker_common.logger import get_logger + + +LOG = get_logger("server") + + +def exc_to_thrift_reqfail(function): + """ + Convert internal exceptions to a `RequestFailed` Thrift exception, which + can be sent back to the RPC client. + """ + func_name = function.__name__ + + def wrapper(*args, **kwargs): + try: + res = function(*args, **kwargs) + return res + except sqlalchemy.exc.SQLAlchemyError as alchemy_ex: + # Convert SQLAlchemy exceptions. + msg = str(alchemy_ex) + import traceback + traceback.print_exc() + + # pylint: disable=raise-missing-from + raise RequestFailed(ErrorCode.DATABASE, msg) + except RequestFailed as rf: + LOG.warning("%s:\n%s", func_name, rf.message) + raise + except Exception as ex: + import traceback + traceback.print_exc() + msg = str(ex) + LOG.warning("%s:\n%s", func_name, msg) + + # pylint: disable=raise-missing-from + raise RequestFailed(ErrorCode.GENERAL, msg) + + return wrapper diff --git a/web/server/codechecker_server/api/report_server.py b/web/server/codechecker_server/api/report_server.py index c98cbc71c0..2348708b81 100644 --- a/web/server/codechecker_server/api/report_server.py +++ b/web/server/codechecker_server/api/report_server.py @@ -44,7 +44,8 @@ ReviewStatusRuleSortType, RunData, RunFilter, RunHistoryData, \ RunReportCount, RunSortType, RunTagCount, \ ReviewStatus as API_ReviewStatus, \ - SourceComponentData, SourceFileData, SortMode, SortType + SourceComponentData, SourceFileData, SortMode, SortType, \ + SubmittedRunOptions from codechecker_common import util from codechecker_common.logger import get_logger @@ -69,6 +70,7 @@ Run, RunHistory, RunHistoryAnalysisInfo, RunLock, \ SourceComponent +from .common import exc_to_thrift_reqfail from .thrift_enum_helper import detection_status_enum, \ detection_status_str, report_status_enum, \ review_status_enum, review_status_str, report_extended_data_type_enum @@ -141,39 +143,6 @@ def slugify(text): return norm_text -def exc_to_thrift_reqfail(function): - """ - Convert internal exceptions to RequestFailed exception - which can be sent back on the thrift connections. - """ - func_name = function.__name__ - - def wrapper(*args, **kwargs): - try: - res = function(*args, **kwargs) - return res - - except sqlalchemy.exc.SQLAlchemyError as alchemy_ex: - # Convert SQLAlchemy exceptions. - msg = str(alchemy_ex) - import traceback - traceback.print_exc() - raise codechecker_api_shared.ttypes.RequestFailed( - codechecker_api_shared.ttypes.ErrorCode.DATABASE, msg) - except codechecker_api_shared.ttypes.RequestFailed as rf: - LOG.warning("%s:\n%s", func_name, rf.message) - raise - except Exception as ex: - import traceback - traceback.print_exc() - msg = str(ex) - LOG.warning("%s:\n%s", func_name, msg) - raise codechecker_api_shared.ttypes.RequestFailed( - codechecker_api_shared.ttypes.ErrorCode.GENERAL, msg) - - return wrapper - - def get_component_values( session: DBSession, component_name: str @@ -3933,6 +3902,18 @@ def massStoreRun(self, name, tag, version, b64zip, force, trim_path_prefixes, description) return m.store() + @exc_to_thrift_reqfail + @timeit + def massStoreRunAsynchronous(self, zipfile_blob: str, + store_opts: SubmittedRunOptions) -> str: + import pprint + LOG.info("massStoreRunAsynchronous() called with:\n\t - %d bytes " + "input\n\t - Options:\n\n%s", len(zipfile_blob), + pprint.pformat(store_opts.__dict__, indent=2, depth=8)) + raise codechecker_api_shared.ttypes.RequestFailed( + codechecker_api_shared.ttypes.ErrorCode.GENERAL, + "massStoreRunAsynchronous() not implemented in this server build!") + @exc_to_thrift_reqfail @timeit def allowsStoringAnalysisStatistics(self): diff --git a/web/server/codechecker_server/api/tasks.py b/web/server/codechecker_server/api/tasks.py new file mode 100644 index 0000000000..9abe4fb743 --- /dev/null +++ b/web/server/codechecker_server/api/tasks.py @@ -0,0 +1,391 @@ + +# +# Part of the CodeChecker project, under the Apache License v2.0 with +# LLVM Exceptions. See LICENSE for license information. +# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +# +# ------------------------------------------------------------------------- +""" +Handle Thrift requests for background task management. +""" +import datetime +import os +import time +from typing import Dict, List, Optional + +from sqlalchemy.sql.expression import and_, or_ + +from codechecker_api_shared.ttypes import RequestFailed, ErrorCode, Ternary +from codechecker_api.codeCheckerServersideTasks_v6.ttypes import \ + AdministratorTaskInfo, TaskFilter, TaskInfo, TaskStatus + +from codechecker_common.logger import get_logger + +from codechecker_server.profiler import timeit + +from ..database.config_db_model import BackgroundTask as DBTask, Product +from ..database.database import DBSession, conv +from ..task_executors.abstract_task import AbstractTask, TaskCancelHonoured +from ..task_executors.task_manager import TaskManager +from .. import permissions +from .common import exc_to_thrift_reqfail + +LOG = get_logger("server") + + +class TestingDummyTask(AbstractTask): + """Implementation of task object created by ``createDummyTask()``.""" + def __init__(self, token: str, timeout: int, should_fail: bool): + super().__init__(token, None) + self.timeout = timeout + self.should_fail = should_fail + + def _implementation(self, tm: TaskManager) -> None: + counter: int = 0 + while counter < self.timeout: + tm.heartbeat(self) + + counter += 1 + LOG.debug("Dummy task ticking... [%d / %d]", + counter, self.timeout) + + if tm.should_cancel(self): + LOG.info("Dummy task '%s' was %s at tick [%d / %d]!", + self.token, + "KILLED BY SHUTDOWN" if tm.is_shutting_down + else "CANCELLED BY ADMIN", + counter, + self.timeout) + raise TaskCancelHonoured(self) + + time.sleep(1) + + if self.should_fail: + raise ValueError("Task self-failure as per the user's request.") + + +def _db_timestamp_to_posix_epoch(d: Optional[datetime.datetime]) \ + -> Optional[int]: + return int(d.replace(tzinfo=datetime.timezone.utc).timestamp()) if d \ + else None + + +def _posix_epoch_to_db_timestamp(s: Optional[int]) \ + -> Optional[datetime.datetime]: + return datetime.datetime.fromtimestamp(s, datetime.timezone.utc) if s \ + else None + + +def _make_task_info(t: DBTask) -> TaskInfo: + """Format API `TaskInfo` from `DBTask`.""" + return TaskInfo( + token=t.token, + taskKind=t.kind, + status=TaskStatus._NAMES_TO_VALUES[t.status.upper()], + productId=t.product_id or 0, + actorUsername=t.username, + summary=t.summary, + comments=t.comments, + enqueuedAtEpoch=_db_timestamp_to_posix_epoch(t.enqueued_at), + startedAtEpoch=_db_timestamp_to_posix_epoch(t.started_at), + completedAtEpoch=_db_timestamp_to_posix_epoch(t.finished_at), + lastHeartbeatEpoch=_db_timestamp_to_posix_epoch( + t.last_seen_at), + cancelFlagSet=t.cancel_flag, + ) + + +def _make_admin_task_info(t: DBTask) -> AdministratorTaskInfo: + """Format API `AdministratorTaskInfo` from `DBTask`.""" + return AdministratorTaskInfo( + normalInfo=_make_task_info(t), + machineId=t.machine_id, + statusConsumed=t.consumed, + ) + + +# These names are inherited from Thrift stubs. +# pylint: disable=invalid-name +class ThriftTaskHandler: + """ + Manages Thrift requests concerning the user-facing Background Tasks API. + """ + + def __init__(self, + configuration_database_sessionmaker, + task_manager: TaskManager, + auth_session): + self._config_db = configuration_database_sessionmaker + self._task_manager = task_manager + self._auth_session = auth_session + + def _get_username(self) -> Optional[str]: + """ + Returns the actually logged in user name. + """ + return self._auth_session.user if self._auth_session else None + + @exc_to_thrift_reqfail + @timeit + def getTaskInfo(self, token: str) -> TaskInfo: + """ + Returns the `TaskInfo` for the task identified by `token`. + """ + with DBSession(self._config_db) as session: + db_task: Optional[DBTask] = session.query(DBTask).get(token) + if not db_task: + raise RequestFailed(ErrorCode.GENERAL, + f"Task '{token}' does not exist!") + + has_right_to_query_status: bool = False + should_set_consumed_flag: bool = False + + if db_task.username == self._get_username(): + has_right_to_query_status = True + should_set_consumed_flag = db_task.is_in_terminated_state + elif db_task.product_id is not None: + associated_product: Optional[Product] = \ + session.query(Product).get(db_task.product_id) + if not associated_product: + LOG.error("No product with ID '%d', but a task is " + "associated with it.", + db_task.product_id) + else: + has_right_to_query_status = \ + permissions.require_permission( + permissions.PRODUCT_ADMIN, + {"config_db_session": session, + "productID": associated_product.id}, + self._auth_session) + + if not has_right_to_query_status: + has_right_to_query_status = permissions.require_permission( + permissions.SUPERUSER, + {"config_db_session": session}, + self._auth_session) + + if not has_right_to_query_status: + raise RequestFailed( + ErrorCode.UNAUTHORIZED, + "Only the task's submitter, a PRODUCT_ADMIN (of the " + "product the task is associated with), or a SUPERUSER " + "can getTaskInfo()!") + + info = _make_task_info(db_task) + + if should_set_consumed_flag: + db_task.consumed = True + session.commit() + + return info + + @exc_to_thrift_reqfail + @timeit + def getTasks(self, filters: TaskFilter) -> List[AdministratorTaskInfo]: + """Obtain tasks matching the `filters` for administrators.""" + if filters.filterForNoProductID and filters.productIDs: + raise RequestFailed(ErrorCode.GENERAL, + "Invalid request, do not set " + "\"no product ID\" and some product IDs in " + "the same filter!") + if filters.filterForNoUsername and filters.usernames: + raise RequestFailed(ErrorCode.GENERAL, + "Invalid request, do not set " + "\"no username\" and some usernames in the " + "same filter!") + + with DBSession(self._config_db) as session: + if filters.filterForNoProductID: + if not permissions.require_permission( + permissions.SUPERUSER, + {"config_db_session": session}, + self._auth_session): + raise RequestFailed( + ErrorCode.UNAUTHORIZED, + "Querying service tasks (not associated with a " + "product) requires SUPERUSER privileges!") + if filters.productIDs: + no_admin_products = [ + prod_id for prod_id in filters.productIDs + if not permissions.require_permission( + permissions.PRODUCT_ADMIN, + {"config_db_session": session, "productID": prod_id}, + self._auth_session)] + if no_admin_products: + no_admin_products = [session.query(Product) + .get(product_id).endpoint + for product_id in no_admin_products] + # pylint: disable=consider-using-f-string + raise RequestFailed(ErrorCode.UNAUTHORIZED, + "Querying product tasks requires " + "PRODUCT_ADMIN rights, but it is " + "missing from product(s): '%s'!" + % ("', '".join(no_admin_products))) + + AND = [] + if filters.tokens: + AND.append(or_(*(DBTask.token.ilike(conv(token)) + for token in filters.tokens))) + + if filters.machineIDs: + AND.append(or_(*(DBTask.machine_id.ilike(conv(machine_id)) + for machine_id in filters.machineIDs))) + + if filters.kinds: + AND.append(or_(*(DBTask.kind.ilike(conv(kind)) + for kind in filters.kinds))) + + if filters.statuses: + AND.append(or_(DBTask.status.in_([ + TaskStatus._VALUES_TO_NAMES[status].lower() + for status in filters.statuses]))) + + if filters.usernames: + AND.append(or_(*(DBTask.username.ilike(conv(username)) + for username in filters.usernames))) + elif filters.filterForNoUsername: + AND.append(DBTask.username.is_(None)) + + if filters.productIDs: + AND.append(or_(DBTask.product_id.in_(filters.productIDs))) + elif filters.filterForNoProductID: + AND.append(DBTask.product_id.is_(None)) + + if filters.enqueuedBeforeEpoch: + AND.append(DBTask.enqueued_at <= _posix_epoch_to_db_timestamp( + filters.enqueuedBeforeEpoch)) + + if filters.enqueuedAfterEpoch: + AND.append(DBTask.enqueued_at >= _posix_epoch_to_db_timestamp( + filters.enqueuedAfterEpoch)) + + if filters.startedBeforeEpoch: + AND.append(DBTask.started_at <= _posix_epoch_to_db_timestamp( + filters.startedBeforeEpoch)) + + if filters.startedAfterEpoch: + AND.append(DBTask.started_at >= _posix_epoch_to_db_timestamp( + filters.startedAfterEpoch)) + + if filters.completedBeforeEpoch: + AND.append(DBTask.finished_at <= _posix_epoch_to_db_timestamp( + filters.completedBeforeEpoch)) + + if filters.completedAfterEpoch: + AND.append(DBTask.finished_at >= _posix_epoch_to_db_timestamp( + filters.completedAfterEpoch)) + + if filters.heartbeatBeforeEpoch: + AND.append(DBTask.last_seen_at <= + _posix_epoch_to_db_timestamp( + filters.heartbeatBeforeEpoch)) + + if filters.heartbeatAfterEpoch: + AND.append(DBTask.last_seen_at >= + _posix_epoch_to_db_timestamp( + filters.heartbeatAfterEpoch)) + + if filters.cancelFlag: + if filters.cancelFlag == Ternary._NAMES_TO_VALUES["OFF"]: + AND.append(DBTask.cancel_flag.is_(False)) + elif filters.cancelFlag == Ternary._NAMES_TO_VALUES["ON"]: + AND.append(DBTask.cancel_flag.is_(True)) + + if filters.consumedFlag: + if filters.consumedFlag == Ternary._NAMES_TO_VALUES["OFF"]: + AND.append(DBTask.consumed.is_(False)) + elif filters.consumedFlag == Ternary._NAMES_TO_VALUES["ON"]: + AND.append(DBTask.consumed.is_(True)) + + ret: List[AdministratorTaskInfo] = [] + has_superuser: Optional[bool] = None + product_admin_rights: Dict[int, bool] = {} + for db_task in session.query(DBTask).filter(and_(*AND)).all(): + if not db_task.product_id: + # Tasks associated with the server, and not a specific + # product, should only be visible to SUPERUSERs. + if has_superuser is None: + has_superuser = permissions.require_permission( + permissions.SUPERUSER, + {"config_db_session": session}, + self._auth_session) + if not has_superuser: + continue + else: + # Tasks associated with a product should only be visible + # to PRODUCT_ADMINs of that product. + try: + if not product_admin_rights[db_task.product_id]: + continue + except KeyError: + product_admin_rights[db_task.product_id] = \ + permissions.require_permission( + permissions.PRODUCT_ADMIN, + {"config_db_session": session, + "productID": db_task.product_id}, + self._auth_session) + if not product_admin_rights[db_task.product_id]: + continue + + ret.append(_make_admin_task_info(db_task)) + + return ret + + @exc_to_thrift_reqfail + @timeit + def cancelTask(self, token: str) -> bool: + """ + Sets the ``cancel_flag`` of the task specified by `token` to `True` + in the database, **REQUESTING** that the task gracefully terminate + itself. + + There are no guarantees that tasks will respect this! + """ + with DBSession(self._config_db) as session: + if not permissions.require_permission( + permissions.SUPERUSER, + {"config_db_session": session}, + self._auth_session): + raise RequestFailed( + ErrorCode.UNAUTHORIZED, + "cancelTask() requires server-level SUPERUSER rights.") + + db_task: Optional[DBTask] = session.query(DBTask).get(token) + if not db_task: + raise RequestFailed(ErrorCode.GENERAL, + f"Task '{token}' does not exist!") + + if not db_task.can_be_cancelled: + return False + + db_task.add_comment("SUPERUSER requested cancellation.", + self._get_username()) + db_task.cancel_flag = True + session.commit() + + return True + + @exc_to_thrift_reqfail + @timeit + def createDummyTask(self, timeout: int, should_fail: bool) -> str: + """ + Used for testing purposes only. + + This function will **ALWAYS** throw an exception when ran outside of a + testing environment. + """ + if "TEST_WORKSPACE" not in os.environ: + raise RequestFailed(ErrorCode.GENERAL, + "createDummyTask() is only available in " + "testing environments!") + + token = self._task_manager.allocate_task_record( + "TaskService::DummyTask", + "Dummy task for testing purposes", + self._get_username(), + None) + + t = TestingDummyTask(token, timeout, should_fail) + self._task_manager.push_task(t) + + return token diff --git a/web/server/codechecker_server/cmd/server.py b/web/server/codechecker_server/cmd/server.py index 33bbbd20f1..7b49982669 100644 --- a/web/server/codechecker_server/cmd/server.py +++ b/web/server/codechecker_server/cmd/server.py @@ -18,13 +18,11 @@ import signal import socket import sys -import time from typing import List, Optional, Tuple, cast from alembic import config from alembic import script from alembic.util import CommandError -import psutil from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import sessionmaker @@ -32,7 +30,7 @@ from codechecker_report_converter import twodim -from codechecker_common import arg, cmd_config, logger, util +from codechecker_common import arg, cmd_config, logger, process, util from codechecker_common.compatibility.multiprocessing import Pool, cpu_count from codechecker_server import instance_manager, server @@ -101,6 +99,25 @@ def add_arguments_to_parser(parser): "authentication settings, TLS certificate" " (cert.pem) and key (key.pem)) from.") + parser.add_argument("--machine-id", + type=str, + dest="machine_id", + default=argparse.SUPPRESS, + required=False, + help=""" +A unique identifier to be used to identify the machine running subsequent +instances of the "same" server process. +This value is only used internally to maintain normal function and bookkeeping +of executed tasks following an unclean server shutdown, e.g., after a crash or +system-level interference. + +If unspecified, defaults to a reasonable default value that is generated from +the computer's hostname, as reported by the operating system. +In most scenarios, there is no need to fine-tune this, except if subsequent +executions of the "same" server is achieved in distinct environments, e.g., +if the server otherwise is running in a container. +""") + parser.add_argument('--host', type=str, dest="listen_address", @@ -424,7 +441,7 @@ def arg_match(options): setattr(args, "instance_manager", True) # If everything is fine, do call the handler for the subcommand. - main(args) + return main(args) parser.set_defaults( func=__handle, func_process_config_file=cmd_config.process_config_file) @@ -762,42 +779,6 @@ def _get_migration_decisions() -> List[Tuple[str, str, bool]]: return 0 -def kill_process_tree(parent_pid, recursive=False): - """Stop the process tree try it gracefully first. - - Try to stop the parent and child processes gracefuly - first if they do not stop in time send a kill signal - to every member of the process tree. - - There is a similar function in the analyzer part please - consider to update that in case of changing this. - """ - proc = psutil.Process(parent_pid) - children = proc.children(recursive) - - # Send a SIGTERM (Ctrl-C) to the main process - proc.terminate() - - # If children processes don't stop gracefully in time, - # slaughter them by force. - _, still_alive = psutil.wait_procs(children, timeout=5) - for p in still_alive: - p.kill() - - # Wait until this process is running. - n = 0 - timeout = 10 - while proc.is_running(): - if n > timeout: - LOG.warning("Waiting for process %s to stop has been timed out" - "(timeout = %s)! Process is still running!", - parent_pid, timeout) - break - - time.sleep(1) - n += 1 - - def __instance_management(args): """Handles the instance-manager commands --list/--stop/--stop-all.""" @@ -842,7 +823,7 @@ def __instance_management(args): continue try: - kill_process_tree(i['pid']) + process.kill_process_tree(i['pid']) LOG.info("Stopped CodeChecker server running on port %s " "in workspace %s (PID: %s)", i['port'], i['workspace'], i['pid']) @@ -1106,16 +1087,21 @@ def server_init_start(args): 'doc_root': context.doc_root, 'version': context.package_git_tag} + # Create a machine ID if the user did not specify one. + machine_id = getattr(args, "machine_id", + f"{socket.gethostname()}:{args.view_port}") + try: - server.start_server(args.config_directory, - package_data, - args.view_port, - cfg_sql_server, - args.listen_address, - 'force_auth' in args, - args.skip_db_cleanup, - context, - environ) + return server.start_server(args.config_directory, + package_data, + args.view_port, + cfg_sql_server, + args.listen_address, + 'force_auth' in args, + args.skip_db_cleanup, + context, + environ, + machine_id) except socket.error as err: if err.errno == errno.EADDRINUSE: LOG.error("Server can't be started, maybe port number (%s) is " @@ -1152,4 +1138,4 @@ def main(args): except FileNotFoundError as fnerr: LOG.error(fnerr) sys.exit(1) - server_init_start(args) + return server_init_start(args) diff --git a/web/server/codechecker_server/database/config_db_model.py b/web/server/codechecker_server/database/config_db_model.py index 00f0c4948e..e2ee5a550b 100644 --- a/web/server/codechecker_server/database/config_db_model.py +++ b/web/server/codechecker_server/database/config_db_model.py @@ -8,8 +8,9 @@ """ SQLAlchemy ORM model for the product configuration database. """ -from datetime import datetime +from datetime import datetime, timezone import sys +from typing import Optional from sqlalchemy import Boolean, CHAR, Column, DateTime, Enum, ForeignKey, \ Integer, MetaData, String, Text @@ -158,6 +159,200 @@ def __init__(self, config_key, config_value): self.config_value = config_value +class BackgroundTask(Base): + """ + Information about background tasks executed on a CodeChecker service, + potentially as part of a cluster, stored in the database. + These entities store the metadata for the task objects, but no information + about the actual "input" of the task exists in the database! + """ + __tablename__ = "background_tasks" + + _token_length = 64 + + machine_id = Column(String, index=True) + """ + A unique, implementation-specific identifier of the actual CodeChecker + server instance that knows how to execute the task. + """ + + token = Column(CHAR(length=_token_length), primary_key=True) + kind = Column(String, nullable=False, index=True) + status = Column(Enum( + # A job token (and thus a BackgroundTask record) was allocated, but + # the job is still under preparation. + "allocated", + + # The job is pending on the server, but the server has all the data + # available to eventually perform the job. + "enqueued", + + # The server is actually performing the job. + "running", + + # The server successfully finished completing the job. + "completed", + + # The execution of the job failed. + # In this stage, the "comments" field likely contains more information + # that is not machine-readable. + "failed", + + # The job never started, or its execution was terminated at the + # request of the administrators. + "cancelled", + + # The job never started, or its execution was terminated due to a + # system-level reason (such as the server's foced shutdown). + "dropped", + ), + nullable=False, + default="enqueued", + index=True) + + product_id = Column(Integer, + ForeignKey("products.id", + deferrable=False, + initially="IMMEDIATE", + ondelete="CASCADE"), + nullable=True, + index=True) + """ + If the job is tightly associated with a product, the ID of the `Product` + entity with which it is associated. + """ + + username = Column(String, nullable=True) + """ + The main actor who was responsible for the creation of the job task. + """ + + summary = Column(String, nullable=False) + comments = Column(Text, nullable=True) + + enqueued_at = Column(DateTime, nullable=True) + started_at = Column(DateTime, nullable=True) + finished_at = Column(DateTime, nullable=True) + + last_seen_at = Column(DateTime, nullable=True) + """ + Contains the timestamp, only when the job is not yet "finished", when the + job last synchronised against the database, e.g., when it last checked the + "cancel_flag" field. + + This is used for health checking whether the background worker is actually + doing something, as a second line of defence to uncover "dropped" jobs, + e.g., when the servers have failed and the new server can not identify + jobs from its "previous life". + """ + + consumed = Column(Boolean, nullable=False, + default=False, server_default=false()) + """ + Whether the status of the job was checked **BY THE MAIN ACTOR** (username). + """ + + cancel_flag = Column(Boolean, nullable=False, + default=False, server_default=false()) + """ + Whether a SUPERUSER has signalled that the job should be cancelled. + + Note, that cancelling is a co-operative action: jobs are never actually + "killed" on the O.S. level from the outside; rather, each job is expected + to be implemented in a way that they regularly query this bit, and if set, + act accordingly. + """ + + def __init__(self, + token: str, + kind: str, + summary: str, + machine_id: str, + user_name: Optional[str], + product: Optional[Product] = None, + ): + self.machine_id = machine_id + self.token = token + self.kind = kind + self.status = "allocated" + self.summary = summary + self.username = user_name + self.last_seen_at = datetime.now(timezone.utc) + + if product: + self.product_id = product.id + + def add_comment(self, comment: str, actor: Optional[str] = None): + if not self.comments: + self.comments = "" + elif self.comments: + self.comments += "\n----------\n" + + self.comments += f"{actor if actor else ''} " \ + f"at {str(datetime.now(timezone.utc))}:\n{comment}" + + def heartbeat(self): + """Update `last_seen_at`.""" + if self.status in ["enqueued", "running"]: + self.last_seen_at = datetime.now(timezone.utc) + + def set_enqueued(self): + """Marks the job as successfully enqueued.""" + if self.status != "allocated": + raise ValueError( + f"Invalid transition '{str(self.status)}' -> 'enqueued'") + + self.status = "enqueued" + self.enqueued_at = datetime.now(timezone.utc) + + def set_running(self): + """Marks the job as currently executing.""" + if self.status != "enqueued": + raise ValueError( + f"Invalid transition '{str(self.status)}' -> 'running'") + + self.status = "running" + self.started_at = datetime.now(timezone.utc) + + def set_finished(self, successfully: bool = True): + """Marks the job as successfully completed or failed.""" + new_status = "completed" if successfully else "failed" + if self.status != "running": + raise ValueError( + f"Invalid transition '{str(self.status)}' -> '{new_status}'") + + self.status = new_status + self.finished_at = datetime.now(timezone.utc) + + def set_abandoned(self, force_dropped_status: bool = False): + """ + Marks the job as cancelled or dropped based on whether the + cancel flag is set. + """ + new_status = "cancelled" \ + if not force_dropped_status and self.cancel_flag \ + else "dropped" + + self.status = new_status + self.finished_at = datetime.now(timezone.utc) + + @property + def is_in_terminated_state(self) -> bool: + """ + Returns whether the current task has finished execution in some way, + for some reason. + """ + return self.status not in ["allocated", "enqueued", "running"] + + @property + def can_be_cancelled(self) -> bool: + """ + Returns whether the task is in a state where setting `cancel_flag` + is meaningful. + """ + return not self.is_in_terminated_state and not self.cancel_flag + + IDENTIFIER = { 'identifier': "ConfigDatabase", 'orm_meta': CC_META diff --git a/web/server/codechecker_server/migrations/config/versions/73b04c41885b_implemented_keeping_track_of_background_tasks.py b/web/server/codechecker_server/migrations/config/versions/73b04c41885b_implemented_keeping_track_of_background_tasks.py new file mode 100644 index 0000000000..45b3ab7bc1 --- /dev/null +++ b/web/server/codechecker_server/migrations/config/versions/73b04c41885b_implemented_keeping_track_of_background_tasks.py @@ -0,0 +1,79 @@ +""" +Implemented keeping track of background tasks through corresponding records +in the server-wide configuration database. + +Revision ID: 73b04c41885b +Revises: 00099e8bc212 +Create Date: 2023-09-21 14:24:27.395597 +""" + +from alembic import op +import sqlalchemy as sa + + +# Revision identifiers, used by Alembic. +revision = '73b04c41885b' +down_revision = '00099e8bc212' +branch_labels = None +depends_on = None + + +def upgrade(): + op.create_table( + "background_tasks", + sa.Column("machine_id", sa.String(), nullable=True), + sa.Column("token", sa.CHAR(length=64), nullable=False), + sa.Column("kind", sa.String(), nullable=False), + sa.Column("status", sa.Enum("allocated", + "enqueued", + "running", + "completed", + "failed", + "cancelled", + "dropped", + name="background_task_statuses"), + nullable=False), + sa.Column("product_id", sa.Integer(), nullable=True), + sa.Column("summary", sa.String(), nullable=False), + sa.Column("comments", sa.Text(), nullable=True), + sa.Column("username", sa.String(), nullable=True), + sa.Column("enqueued_at", sa.DateTime(), nullable=True), + sa.Column("started_at", sa.DateTime(), nullable=True), + sa.Column("finished_at", sa.DateTime(), nullable=True), + sa.Column("last_seen_at", sa.DateTime(), nullable=True), + sa.Column("consumed", sa.Boolean(), nullable=False, + server_default=sa.false()), + sa.Column("cancel_flag", sa.Boolean(), nullable=False, + server_default=sa.false()), + + sa.ForeignKeyConstraint( + ["product_id"], ["products.id"], + name=op.f("fk_background_tasks_product_id_products"), + deferrable=False, + ondelete="CASCADE", + initially="IMMEDIATE"), + sa.PrimaryKeyConstraint("token", name=op.f("pk_background_tasks")) + ) + op.create_index(op.f("ix_background_tasks_kind"), "background_tasks", + ["kind"], unique=False) + op.create_index(op.f("ix_background_tasks_machine_id"), "background_tasks", + ["machine_id"], unique=False) + op.create_index(op.f("ix_background_tasks_product_id"), "background_tasks", + ["product_id"], unique=False) + op.create_index(op.f("ix_background_tasks_status"), "background_tasks", + ["status"], unique=False) + + +def downgrade(): + ctx = op.get_context() + dialect = ctx.dialect.name + + op.drop_index(op.f("ix_background_tasks_status"), "background_tasks") + op.drop_index(op.f("ix_background_tasks_product_id"), "background_tasks") + op.drop_index(op.f("ix_background_tasks_machine_id"), "background_tasks") + op.drop_index(op.f("ix_background_tasks_kind"), "background_tasks") + + op.drop_table("action_history") + + if dialect == "postgresql": + op.execute("DROP TYPE background_task_statuses;") diff --git a/web/server/codechecker_server/routing.py b/web/server/codechecker_server/routing.py index 79ac8d0686..34fbb82f87 100644 --- a/web/server/codechecker_server/routing.py +++ b/web/server/codechecker_server/routing.py @@ -15,25 +15,28 @@ from codechecker_web.shared.version import SUPPORTED_VERSIONS -# A list of top-level path elements under the webserver root -# which should not be considered as a product route. -NON_PRODUCT_ENDPOINTS = ['index.html', - 'images', - 'docs', - 'live', - 'ready'] +# A list of top-level path elements under the webserver root which should not +# be considered as a product route. +NON_PRODUCT_ENDPOINTS = ["index.html", + "images", + "docs", + "live", + "ready", + ] # A list of top-level path elements in requests (such as Thrift endpoints) # which should not be considered as a product route. -NON_PRODUCT_ENDPOINTS += ['Authentication', - 'Products', - 'CodeCheckerService'] +NON_PRODUCT_ENDPOINTS += ["Authentication", + "Products", + "CodeCheckerService", + "Tasks", + ] # A list of top-level path elements under the webserver root which should -# be protected by authentication requirement when accessing the server. +# be protected by authentication requirements when accessing the server. PROTECTED_ENTRY_POINTS = ['', # Empty string in a request is 'index.html'. - 'index.html'] + "index.html"] def is_valid_product_endpoint(uripart): @@ -68,9 +71,8 @@ def is_supported_version(version): If supported, returns the major and minor version as a tuple. """ - version = version.lstrip('v') - version_parts = version.split('.') + version_parts = version.split('.', 2) # We don't care if accidentally the version tag contains a revision number. major, minor = int(version_parts[0]), int(version_parts[1]) @@ -113,9 +115,8 @@ def split_client_POST_request(path): Returns the product endpoint, the API version and the API service endpoint as a tuple of 3. """ - # A standard POST request from an API client looks like: - # http://localhost:8001/[product-name]// + # http://localhost:8001/[product-name]/v/ # where specifying the product name is optional. split_path = urlparse(path).path.split('/', 3) diff --git a/web/server/codechecker_server/server.py b/web/server/codechecker_server/server.py index 40bdf6db4d..f10f28291e 100644 --- a/web/server/codechecker_server/server.py +++ b/web/server/codechecker_server/server.py @@ -12,6 +12,7 @@ import atexit +from collections import Counter import datetime from functools import partial from hashlib import sha256 @@ -25,10 +26,10 @@ import ssl import sys import stat -from typing import List, Optional, Tuple +import time +from typing import Dict, List, Optional, Tuple, cast import urllib -import multiprocess from sqlalchemy.orm import sessionmaker from sqlalchemy.sql.expression import func from thrift.protocol import TJSONProtocol @@ -47,11 +48,14 @@ codeCheckerProductService as ProductAPI_v6 from codechecker_api.ServerInfo_v6 import \ serverInfoService as ServerInfoAPI_v6 +from codechecker_api.codeCheckerServersideTasks_v6 import \ + codeCheckerServersideTaskService as TaskAPI_v6 from codechecker_common import util -from codechecker_common.logger import get_logger from codechecker_common.compatibility.multiprocessing import \ - Pool, cpu_count + Pool, Process, Queue, Value, cpu_count +from codechecker_common.logger import get_logger, signal_log +from codechecker_common.util import generate_random_token from codechecker_web.shared import database_status from codechecker_web.shared.version import get_version_str @@ -63,12 +67,15 @@ from .api.report_server import ThriftRequestHandler as ReportHandler_v6 from .api.server_info_handler import \ ThriftServerInfoHandler as ServerInfoHandler_v6 +from .api.tasks import ThriftTaskHandler as TaskHandler_v6 from .database import database, db_cleanup from .database.config_db_model import Product as ORMProduct, \ Configuration as ORMConfiguration from .database.database import DBSession from .database.run_db_model import IDENTIFIER as RUN_META, Run, RunLock -from .tmp import get_tmp_dir_hash +from .task_executors.main import executor as background_task_executor +from .task_executors.task_manager import \ + TaskManager as BackgroundTaskManager, drop_all_incomplete_tasks LOG = get_logger('server') @@ -85,8 +92,8 @@ def __init__(self, request, client_address, server): self.path = None super().__init__(request, client_address, server) - def log_message(self, *args): - """ Silencing http server. """ + def log_message(self, *_args): + """Silencing HTTP server.""" return def send_thrift_exception(self, error_msg, iprot, oprot, otrans): @@ -104,7 +111,7 @@ def send_thrift_exception(self, error_msg, iprot, oprot, otrans): result = otrans.getvalue() self.send_response(200) self.send_header("content-type", "application/x-thrift") - self.send_header("Content-Length", len(result)) + self.send_header("Content-Length", str(len(result))) self.end_headers() self.wfile.write(result) @@ -369,22 +376,22 @@ def do_POST(self): major_version, _ = version_supported if major_version == 6: - if request_endpoint == 'Authentication': + if request_endpoint == "Authentication": auth_handler = AuthHandler_v6( self.server.manager, self.auth_session, self.server.config_session) processor = AuthAPI_v6.Processor(auth_handler) - elif request_endpoint == 'Configuration': + elif request_endpoint == "Configuration": conf_handler = ConfigHandler_v6( self.auth_session, self.server.config_session) processor = ConfigAPI_v6.Processor(conf_handler) - elif request_endpoint == 'ServerInfo': + elif request_endpoint == "ServerInfo": server_info_handler = ServerInfoHandler_v6(version) processor = ServerInfoAPI_v6.Processor( server_info_handler) - elif request_endpoint == 'Products': + elif request_endpoint == "Products": prod_handler = ProductHandler_v6( self.server, self.auth_session, @@ -392,7 +399,13 @@ def do_POST(self): product, version) processor = ProductAPI_v6.Processor(prod_handler) - elif request_endpoint == 'CodeCheckerService': + elif request_endpoint == "Tasks": + task_handler = TaskHandler_v6( + self.server.config_session, + self.server.task_manager, + self.auth_session) + processor = TaskAPI_v6.Processor(task_handler) + elif request_endpoint == "CodeCheckerService": # This endpoint is a product's report_server. if not product: error_msg = \ @@ -745,7 +758,10 @@ def __init__(self, pckg_data, context, check_env, - manager): + manager: session_manager.SessionManager, + machine_id: str, + task_queue: Queue, + server_shutdown_flag: Value): LOG.debug("Initializing HTTP server...") @@ -756,6 +772,7 @@ def __init__(self, self.context = context self.check_env = check_env self.manager = manager + self.address, self.port = server_address self.__products = {} # Create a database engine for the configuration database. @@ -764,6 +781,12 @@ def __init__(self, self.config_session = sessionmaker(bind=self.__engine) self.manager.set_database_connection(self.config_session) + self.__task_queue = task_queue + self.task_manager = BackgroundTaskManager(task_queue, + self.config_session, + server_shutdown_flag, + machine_id) + # Load the initial list of products and set up the server. cfg_sess = self.config_session() permissions.initialise_defaults('SYSTEM', { @@ -780,7 +803,7 @@ def __init__(self, cfg_sess.close() try: - HTTPServer.__init__(self, server_address, + HTTPServer.__init__(self, (self.address, self.port), RequestHandlerClass, bind_and_activate=True) ssl_key_file = os.path.join(config_directory, "key.pem") @@ -806,13 +829,23 @@ def __init__(self, else: LOG.info("Searching for SSL key at %s, cert at %s, " - "not found...", ssl_key_file, ssl_cert_file) + "not found!", ssl_key_file, ssl_cert_file) LOG.info("Falling back to simple, insecure HTTP.") except Exception as e: LOG.error("Couldn't start the server: %s", e.__str__()) raise + # If the server was started with the port 0, the OS will pick an + # available port. + # For this reason, we will update the port variable after server + # ininitialisation. + self.port = self.socket.getsockname()[1] + + @property + def formatted_address(self) -> str: + return f"{str(self.address)}:{self.port}" + def configure_keepalive(self): """ Enable keepalive on the socket and some TCP keepalive configuration @@ -855,17 +888,40 @@ def configure_keepalive(self): LOG.error('Failed to set TCP max keepalive probe: %s', ret) def terminate(self): - """ - Terminating the server. - """ + """Terminates the server and releases associated resources.""" try: self.server_close() + self.__task_queue.close() + self.__task_queue.join_thread() self.__engine.dispose() + + sys.exit(128 + signal.SIGINT) except Exception as ex: LOG.error("Failed to shut down the WEB server!") LOG.error(str(ex)) sys.exit(1) + def serve_forever_with_shutdown_handler(self): + """ + Calls `HTTPServer.serve_forever` but handles SIGINT (2) signals + gracefully such that the open resources are properly cleaned up. + """ + def _handler(signum: int, _frame): + if signum not in [signal.SIGINT]: + signal_log(LOG, "ERROR", "Signal " + f"<{signal.Signals(signum).name} ({signum})> " + "handling attempted by " + "'serve_forever_with_shutdown_handler'!") + return + + signal_log(LOG, "DEBUG", f"{os.getpid()}: Received " + f"{signal.Signals(signum).name} ({signum}), " + "performing shutdown ...") + self.terminate() + + signal.signal(signal.SIGINT, _handler) + return self.serve_forever() + def add_product(self, orm_product, init_db=False): """ Adds a product to the list of product databases connected to @@ -990,6 +1046,10 @@ class CCSimpleHttpServerIPv6(CCSimpleHttpServer): address_family = socket.AF_INET6 + @property + def formatted_address(self) -> str: + return f"[{str(self.address)}]:{self.port}" + def __make_root_file(root_file): """ @@ -1000,7 +1060,7 @@ def __make_root_file(root_file): LOG.debug("Generating initial superuser (root) credentials...") username = ''.join(sample("ABCDEFGHIJKLMNOPQRSTUVWXYZ", 6)) - password = get_tmp_dir_hash()[:8] + password = generate_random_token(8) LOG.info("A NEW superuser credential was generated for the server. " "This information IS SAVED, thus subsequent server starts " @@ -1028,16 +1088,16 @@ def __make_root_file(root_file): return secret -def start_server(config_directory, package_data, port, config_sql_server, - listen_address, force_auth, skip_db_cleanup: bool, - context, check_env): +def start_server(config_directory: str, package_data, port: int, + config_sql_server, listen_address: str, + force_auth: bool, skip_db_cleanup: bool, + context, check_env, machine_id: str) -> int: """ - Start http server to handle web client and thrift requests. + Starts the HTTP server to handle Web client and Thrift requests, execute + background jobs. """ LOG.debug("Starting CodeChecker server...") - server_addr = (listen_address, port) - root_file = os.path.join(config_directory, 'root.user') if not os.path.exists(root_file): LOG.warning("Server started without 'root.user' present in " @@ -1103,92 +1163,445 @@ def start_server(config_directory, package_data, port, config_sql_server, else: LOG.debug("Skipping db_cleanup, as requested.") + def _cleanup_incomplete_tasks(action: str) -> int: + config_session_factory = config_sql_server.create_engine() + try: + return drop_all_incomplete_tasks( + sessionmaker(bind=config_session_factory), + machine_id, action) + finally: + config_session_factory.dispose() + + dropped_tasks = _cleanup_incomplete_tasks( + "New server started with the same machine_id, assuming the old " + "server is dead and won't be able to finish the task.") + if dropped_tasks: + LOG.info("At server startup, dropped %d background tasks left behind " + "by a previous server instance matching machine ID '%s'.", + dropped_tasks, machine_id) + + api_processes: Dict[int, Process] = {} + requested_api_threads = cast(int, manager.worker_processes) \ + or cpu_count() + + bg_processes: Dict[int, Process] = {} + requested_bg_threads = cast(int, + manager.background_worker_processes) \ + or requested_api_threads + # Note that Queue under the hood uses OS-level primitives such as a socket + # or a pipe, where the read-write buffers have a **LIMITED** capacity, and + # are usually **NOT** backed by the full amount of available system memory. + bg_task_queue: Queue = Queue() + is_server_shutting_down = Value('B', False) + server_clazz = CCSimpleHttpServer - if ':' in server_addr[0]: + if ':' in listen_address: # IPv6 address specified for listening. # FIXME: Python>=3.8 automatically handles IPv6 if ':' is in the bind # address, see https://bugs.python.org/issue24209. server_clazz = CCSimpleHttpServerIPv6 - http_server = server_clazz(server_addr, + http_server = server_clazz((listen_address, port), RequestHandler, config_directory, config_sql_server, package_data, context, check_env, - manager) + manager, + machine_id, + bg_task_queue, + is_server_shutting_down) + + try: + instance_manager.register(os.getpid(), + os.path.abspath( + context.codechecker_workspace), + port) + except IOError as ex: + LOG.debug(ex.strerror) - # If the server was started with the port 0, the OS will pick an available - # port. For this reason we will update the port variable after server - # initialization. - port = http_server.socket.getsockname()[1] + def unregister_handler(pid): + # Handle errors during instance unregistration. + # The workspace might be removed so updating the config content might + # fail. + try: + instance_manager.unregister(pid) + except IOError as ex: + LOG.debug(ex.strerror) - processes = [] + atexit.register(unregister_handler, os.getpid()) - def signal_handler(signum, _): + def _start_process_with_no_signal_handling(**kwargs): """ - Handle SIGTERM to stop the server running. + Starts a `multiprocessing.Process` in a context where the signal + handling is temporarily disabled, such that the child process does not + inherit any signal handling from the parent. + + Child processes spawned after the main process set up its signals + MUST NOT inherit the signal handling because that would result in + multiple children firing on the SIGTERM handler, for example. + + For this reason, we temporarily disable the signal handling here by + returning to the initial defaults, and then restore the main process's + signal handling to be the usual one. """ - LOG.info("Shutting down the WEB server on [%s:%d]", - '[' + listen_address + ']' - if server_clazz is CCSimpleHttpServerIPv6 else listen_address, - port) - http_server.terminate() + signals_to_disable = [signal.SIGINT, signal.SIGTERM] + if sys.platform != "win32": + signals_to_disable += [signal.SIGCHLD, signal.SIGHUP] - # Terminate child processes. - for pp in processes: - pp.terminate() + existing_signal_handlers = {} + for signum in signals_to_disable: + existing_signal_handlers[signum] = signal.signal( + signum, signal.SIG_DFL) + + p = Process(**kwargs) + p.start() + + for signum in signals_to_disable: + signal.signal(signum, existing_signal_handlers[signum]) + + return p + + # Save a process-wide but not shared counter in the main process for how + # many subprocesses of each kind had been spawned, as this will be used in + # the internal naming of the workers. + spawned_api_proc_count: int = 0 + spawned_bg_proc_count: int = 0 + + def spawn_api_process(): + """Starts a single HTTP API worker process for CodeChecker server.""" + nonlocal spawned_api_proc_count + spawned_api_proc_count += 1 + + p = _start_process_with_no_signal_handling( + target=http_server.serve_forever_with_shutdown_handler, + name=f"CodeChecker-API-{spawned_api_proc_count}") + api_processes[cast(int, p.pid)] = p + signal_log(LOG, "DEBUG", f"API handler child process {p.pid} started!") + return p + + LOG.info("Using %d API request handler processes ...", + requested_api_threads) + for _ in range(requested_api_threads): + spawn_api_process() + + def spawn_bg_process(): + """Starts a single Task worker process for CodeChecker server.""" + nonlocal spawned_bg_proc_count + spawned_bg_proc_count += 1 + + p = _start_process_with_no_signal_handling( + target=background_task_executor, + args=(bg_task_queue, + config_sql_server, + is_server_shutting_down, + machine_id, + ), + name=f"CodeChecker-Task-{spawned_bg_proc_count}") + bg_processes[cast(int, p.pid)] = p + signal_log(LOG, "DEBUG", f"Task child process {p.pid} started!") + return p + + LOG.info("Using %d Task handler processes ...", requested_bg_threads) + for _ in range(requested_bg_threads): + spawn_bg_process() + + termination_signal_timestamp = Value('d', 0) + + def forced_termination_signal_handler(signum: int, _frame): + """ + Handle SIGINT (2) and SIGTERM (15) received a second time to stop the + server ungracefully. + """ + if signum not in [signal.SIGINT, signal.SIGTERM]: + signal_log(LOG, "ERROR", "Signal " + f"<{signal.Signals(signum).name} ({signum})> " + "handling attempted by " + "'forced_termination_signal_handler'!") + return + if not is_server_shutting_down.value or \ + abs(termination_signal_timestamp.value) <= \ + sys.float_info.epsilon: + return + if time.time() - termination_signal_timestamp.value <= 2.0: + # Allow some time to pass between the handling of the normal + # termination vs. doing something in the "forced" handler, because + # a human's ^C keypress in a terminal can generate multiple SIGINTs + # in a quick succession. + return + + signal.signal(signal.SIGINT, signal.SIG_IGN) + signal.signal(signal.SIGTERM, signal.SIG_IGN) + + signal_log(LOG, "WARNING", "Termination signal " + f"<{signal.Signals(signum).name} ({signum})> " + "received a second time, **FORCE** killing the WEB server " + f"on [{http_server.formatted_address}] ...") + + for p in list(api_processes.values()) + list(bg_processes.values()): + try: + p.kill() + except (OSError, ValueError): + pass + # No mercy this time. sys.exit(128 + signum) - def reload_signal_handler(*_args, **_kwargs): + exit_code = Value('B', 0) + + def termination_signal_handler(signum: int, _frame): """ - Reloads server configuration file. + Handle SIGINT (2) and SIGTERM (15) to stop the server gracefully. """ + # Debounce termination signals at this point. + signal.signal(signal.SIGINT, forced_termination_signal_handler) + signal.signal(signal.SIGTERM, forced_termination_signal_handler) + + if is_server_shutting_down.value: + return + if signum not in [signal.SIGINT, signal.SIGTERM]: + signal_log(LOG, "ERROR", "Signal " + f"<{signal.Signals(signum).name} ({signum})> " + "handling attempted by 'termination_signal_handler'!") + return + + is_server_shutting_down.value = True + termination_signal_timestamp.value = time.time() + + exit_code.value = 128 + signum + signal_log(LOG, "INFO", "Shutting down the WEB server on " + f"[{http_server.formatted_address}] ... " + "Please allow some time for graceful clean-up!") + + # Terminate child processes. + # For these subprocesses, let the processes properly clean up after + # themselves in a graceful shutdown scenario. + # For this reason, we fire a bunch of SIGHUPs first, indicating + # that the main server process wants to exit, and then wait for + # the children to die once all of them got the signal. + for pid in api_processes: + try: + signal_log(LOG, "DEBUG", f"SIGINT! API child PID: {pid} ...") + os.kill(pid, signal.SIGINT) + except (OSError, ValueError): + pass + for pid in list(api_processes.keys()): + p = api_processes[pid] + try: + signal_log(LOG, "DEBUG", f"join() API child PID: {pid} ...") + p.join() + p.close() + except (OSError, ValueError): + pass + finally: + del api_processes[pid] + + bg_task_queue.close() + bg_task_queue.join_thread() + for pid in bg_processes: + try: + signal_log(LOG, "DEBUG", f"SIGHUP! Task child PID: {pid} ...") + os.kill(pid, signal.SIGHUP) + except (OSError, ValueError): + pass + for pid in list(bg_processes.keys()): + p = bg_processes[pid] + try: + signal_log(LOG, "DEBUG", f"join() Task child PID: {pid} ...") + p.join() + p.close() + except (OSError, ValueError): + pass + finally: + del bg_processes[pid] + + def reload_signal_handler(signum: int, _frame): + """ + Handle SIGHUP (1) to reload the server's configuration file to memory. + """ + if signum not in [signal.SIGHUP]: + signal_log(LOG, "ERROR", "Signal " + f"<{signal.Signals(signum).name} ({signum})> " + "handling attempted by 'reload_signal_handler'!") + return + + signal_log(LOG, "INFO", + "Received signal to reload server configuration ...") + manager.reload_config() - try: - instance_manager.register(os.getpid(), - os.path.abspath( - context.codechecker_workspace), - port) - except IOError as ex: - LOG.debug(ex.strerror) + signal_log(LOG, "INFO", "Server configuration reload: Done.") - LOG.info("Server waiting for client requests on [%s:%d]", - '[' + listen_address + ']' - if server_clazz is CCSimpleHttpServerIPv6 else listen_address, - port) + sigchild_event_counter = Value('I', 0) + is_already_handling_sigchild = Value('B', False) - def unregister_handler(pid): + def child_signal_handler(signum: int, _frame): """ - Handle errors during instance unregistration. - The workspace might be removed so updating the - config content might fail. + Handle SIGCHLD (17) that signals a child process's interruption or + death by creating a new child to ensure that the requested number of + workers are always alive. """ - try: - instance_manager.unregister(pid) - except IOError as ex: - LOG.debug(ex.strerror) + if is_already_handling_sigchild.value: + # Do not perform this handler recursively to prevent spawning too + # many children. + return + if is_server_shutting_down.value: + # Do not handle SIGCHLD events during normal shutdown, because + # our own subprocess termination calls would fire this handler. + return + if signum not in [signal.SIGCHLD]: + signal_log(LOG, "ERROR", "Signal " + f"<{signal.Signals(signum).name} ({signum})> " + "handling attempted by 'child_signal_handler'!") + return - atexit.register(unregister_handler, os.getpid()) + is_already_handling_sigchild.value = True - for _ in range(manager.worker_processes - 1): - p = multiprocess.Process(target=http_server.serve_forever) - processes.append(p) - p.start() + force_slow_path: bool = False + event_counter: int = sigchild_event_counter.value + if event_counter >= \ + min(requested_api_threads, requested_bg_threads) // 2: + force_slow_path = True + else: + sigchild_event_counter.value = event_counter + 1 - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) + # How many new processes need to be spawned for each type of worker + # process? + spawn_needs: Counter = Counter() + def _check_process_one(kind: str, proclist: Dict[int, Process], + pid: int): + try: + p = proclist[pid] + except KeyError: + return + + # Unfortunately, "Process.is_alive()" cannot be used here, because + # during the handling of SIGCHLD during a child's death, according + # to the state of Python's data structures, the child is still + # alive. + # We run a low-level non-blocking wait again, which will + # immediately return, but properly reap the child process if it has + # terminated. + try: + _, status_signal = os.waitpid(pid, os.WNOHANG) + if status_signal == 0: + # The process is still alive. + return + except ChildProcessError: + pass + + signal_log(LOG, "WARNING", + f"'{kind}' child process (PID {pid}, \"{p.name}\") " + "is not alive anymore!") + spawn_needs[kind] += 1 + + try: + del proclist[pid] + except KeyError: + # Due to the bunching up of signals and that Python runs the + # C-level signals with a custom logic inside the interpreter, + # coupled with the fact that PIDs can be reused, the same PID + # can be reported dead in a quick succession of signals, + # resulting in a KeyError here. + pass + + def _check_processes_many(kind: str, proclist: Dict[int, Process]): + for pid in sorted(proclist.keys()): + _check_process_one(kind, proclist, pid) + + # Try to find the type of the interrupted/dead process based on signal + # information first. + # This should be quicker and more deterministic. + try: + child_pid, child_signal = os.waitpid(-1, os.WNOHANG) + if child_signal == 0: + # Go to the slow path and check the children manually, we did + # not receive a reply from waitpid() with an actual dead child. + raise ChildProcessError() + + _check_process_one("api", api_processes, child_pid) + _check_process_one("background", bg_processes, child_pid) + except ChildProcessError: + # We have not gotten a PID, or it was not found, so we do not know + # who died; in this case, it is better to go on the slow path and + # query all our children individually. + spawn_needs.clear() # Forces the Counter to be empty. + + if force_slow_path: + # A clever sequence of child killings in variously sized batches + # can easily result in missing a few signals here and there, and + # missing a few dead children because 'os.waitpid()' allows us to + # fall into a false "fast path" situation. + # To remedy this, we every so often force a slow path to ensure + # the number of worker processes is as close to the requested + # amount of possible. + + # Forces the Counter to be empty, even if the fast path put an + # entry in there. + spawn_needs.clear() + + if not spawn_needs: + _check_processes_many("api", api_processes) + _check_processes_many("background", bg_processes) + + if force_slow_path: + sigchild_event_counter.value = 0 + signal_log(LOG, "WARNING", + "Too many children died since last full status " + "check, performing one ...") + + # If we came into the handler with a "forced slow path" situation, + # ensure that we spawn enough new processes to backfill the + # missing amount, even if due to the flakyness of signal handling, + # we might not have actually gotten "N" times SIGCHLD firings for + # the death of N children, if they happened in a bundle situation, + # e.g., kill N/4, then kill N/2, then kill 1 or 2, then kill the + # remaining. + spawn_needs["api"] = \ + util.clamp(0, requested_api_threads - len(api_processes), + requested_api_threads) + spawn_needs["background"] = \ + util.clamp(0, requested_bg_threads - len(bg_processes), + requested_bg_threads) + + for kind, num in spawn_needs.items(): + signal_log(LOG, "INFO", + f"(Re-)starting {num} '{kind}' child process(es) ...") + + if kind == "api": + for _ in range(num): + spawn_api_process() + elif kind == "background": + for _ in range(num): + spawn_bg_process() + + is_already_handling_sigchild.value = False + + signal.signal(signal.SIGINT, termination_signal_handler) + signal.signal(signal.SIGTERM, termination_signal_handler) if sys.platform != "win32": + signal.signal(signal.SIGCHLD, child_signal_handler) signal.signal(signal.SIGHUP, reload_signal_handler) - # Main process also acts as a worker. - http_server.serve_forever() + LOG.info("Server waiting for client requests on [%s]", + http_server.formatted_address) + + # We can not use a multiprocessing.Event here because that would result in + # a deadlock, as the process waiting on the event is the one receiving the + # shutdown signal. + while not is_server_shutting_down.value: + time.sleep(5) + + dropped_tasks = _cleanup_incomplete_tasks("Server shut down, task will " + "be never be completed.") + if dropped_tasks: + LOG.info("At server shutdown, dropped %d background tasks that will " + "never be completed.", dropped_tasks) - LOG.info("Webserver quit.") + LOG.info("CodeChecker server quit (main process).") + return exit_code.value def add_initial_run_database(config_sql_server, product_connection): diff --git a/web/server/codechecker_server/session_manager.py b/web/server/codechecker_server/session_manager.py index 276af909cd..662eaa62b0 100644 --- a/web/server/codechecker_server/session_manager.py +++ b/web/server/codechecker_server/session_manager.py @@ -11,16 +11,14 @@ import hashlib import json -import os import re -import uuid from datetime import datetime from typing import Optional from codechecker_common.compatibility.multiprocessing import cpu_count from codechecker_common.logger import get_logger -from codechecker_common.util import load_json +from codechecker_common.util import generate_random_token, load_json from codechecker_web.shared.env import check_file_owner_rw from codechecker_web.shared.version import SESSION_COOKIE_NAME as _SCN @@ -47,29 +45,29 @@ SESSION_COOKIE_NAME = _SCN -def generate_session_token(): - """ - Returns a random session token. - """ - return uuid.UUID(bytes=os.urandom(16)).hex - - def get_worker_processes(scfg_dict): """ Return number of worker processes from the config dictionary. - Return 'worker_processes' field from the config dictionary or returns the - default value if this field is not set or the value is negative. + Return 'worker_processes' and 'background_worker_processes' fields from + the config dictionary or returns the default value if this field is not + set or the value is negative. """ default = cpu_count() - worker_processes = scfg_dict.get('worker_processes', default) + worker_processes = scfg_dict.get("worker_processes", default) + background_worker_processes = scfg_dict.get("background_worker_processes", + default) - if worker_processes < 0: + if not worker_processes or worker_processes < 0: LOG.warning("Number of worker processes can not be negative! Default " "value will be used: %s", default) worker_processes = default + if not background_worker_processes or background_worker_processes < 0: + LOG.warning("Number of task worker processes can not be negative! " + "Default value will be used: %s", worker_processes) + background_worker_processes = worker_processes - return worker_processes + return worker_processes, background_worker_processes class _Session: @@ -182,7 +180,8 @@ def __init__(self, configuration_file, root_sha, force_auth=False): # so it should NOT be handled by session_manager. A separate config # handler for the server's stuff should be created, that can properly # instantiate SessionManager with the found configuration. - self.__worker_processes = get_worker_processes(scfg_dict) + self.__worker_processes, self.__background_worker_processes = \ + get_worker_processes(scfg_dict) self.__max_run_count = scfg_dict.get('max_run_count', None) self.__store_config = scfg_dict.get('store', {}) self.__keepalive_config = scfg_dict.get('keepalive', {}) @@ -328,6 +327,10 @@ def is_enabled(self): def worker_processes(self): return self.__worker_processes + @property + def background_worker_processes(self) -> int: + return self.__background_worker_processes + def get_realm(self): return { "realm": self.__auth_config.get('realm_name'), @@ -622,7 +625,7 @@ def create_session(self, auth_string): return False # Generate a new token and create a local session. - token = generate_session_token() + token = generate_random_token(32) user_name = validation.get('username') groups = validation.get('groups', []) is_root = validation.get('root', False) diff --git a/web/server/codechecker_server/task_executors/__init__.py b/web/server/codechecker_server/task_executors/__init__.py new file mode 100644 index 0000000000..4259749345 --- /dev/null +++ b/web/server/codechecker_server/task_executors/__init__.py @@ -0,0 +1,7 @@ +# ------------------------------------------------------------------------- +# +# Part of the CodeChecker project, under the Apache License v2.0 with +# LLVM Exceptions. See LICENSE for license information. +# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +# +# ------------------------------------------------------------------------- diff --git a/web/server/codechecker_server/task_executors/abstract_task.py b/web/server/codechecker_server/task_executors/abstract_task.py new file mode 100644 index 0000000000..34e1cd4d7a --- /dev/null +++ b/web/server/codechecker_server/task_executors/abstract_task.py @@ -0,0 +1,183 @@ +# ------------------------------------------------------------------------- +# +# Part of the CodeChecker project, under the Apache License v2.0 with +# LLVM Exceptions. See LICENSE for license information. +# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +# +# ------------------------------------------------------------------------- +""" +Contains the base class to be inherited and implemented by all background task +types. +""" +import os +import pathlib +import shutil +from typing import Optional + +from codechecker_common.logger import get_logger + +from ..database.config_db_model import BackgroundTask as DBTask + + +LOG = get_logger("server") + + +class TaskCancelHonoured(Exception): + """ + Specialised tag exception raised by `AbstractTask` implementations in a + checkpoint after having checked that their ``cancel_flag`` was set, in + order to terminate task-specific execution and to register the + cancellation's success by the `AbstractTask.execute` method. + + This exception should **NOT** be caught by user code. + """ + + def __init__(self, task_obj: "AbstractTask"): + super().__init__(f"Task '{task_obj.token}' honoured CANCEL request.") + self.task_obj = task_obj + + +class AbstractTask: + """ + Base class implementing common execution and bookkeeping methods to + facilitate the dispatch of tasks to background worker processes. + + Instances of this class **MUST** be marshallable by ``pickle``, as they + are transported over an IPC `Queue`. + It is important that instances do not grow too large, as the underlying + OS-level primitives of a `Queue` can get full, which can result in a + deadlock situation. + + The run-time contents of the instance should only contain the bare minimum + metadata required for the implementation to execute in the background. + + Implementors of subclasses **MAY REASONABLY ASSUME** that an + `AbstractTask` scheduled in the API handler process of a server will be + actually executed by a background worker in the same process group, on the + same machine instance. + """ + + def __init__(self, token: str, data_path: Optional[pathlib.Path]): + self._token = token + self._data_path = data_path + + @property + def token(self) -> str: + """Returns the task's identifying token, its primary ID.""" + return self._token + + @property + def data_path(self) -> Optional[pathlib.Path]: + """ + Returns the filesystem path where the task's input data is prepared. + """ + return self._data_path + + def destroy_data(self): + """ + Deletes the contents of `data_path`. + """ + if not self._data_path: + return + + try: + shutil.rmtree(self._data_path) + except Exception as ex: + LOG.warning("Failed to remove background task's data_dir at " + "'%s':\n%s", self.data_path, str(ex)) + + def _implementation(self, _task_manager: "TaskManager") -> None: + """ + Implemented by subclasses to perform the logic specific to the task. + + Subclasses should use the `task_manager` object, injected from the + context of the executed subprocess, to query and mutate service-level + information about the current task. + """ + raise NotImplementedError() + + def execute(self, task_manager: "TaskManager") -> None: + """ + Executes the `_implementation` of the task, overridden by subclasses, + to perform a task-specific business logic. + + This high-level wrapper deals with capturing `Exception`s, setting + appropriate status information in the database (through the + injected `task_manager`) and logging failures accordingly. + """ + if task_manager.should_cancel(self): + return + + try: + task_manager._mutate_task_record( + self, lambda dbt: dbt.set_running()) + except KeyError: + # KeyError is thrown if a task without a corresponding database + # record is attempted to be executed. + LOG.error("Failed to execute task '%s' due to database exception", + self.token) + except Exception as ex: + LOG.error("Failed to execute task '%s' due to database exception" + "\n%s", + self.token, str(ex)) + # For any other record, try to set the task abandoned due to an + # exception. + try: + task_manager._mutate_task_record( + self, lambda dbt: + dbt.set_abandoned(force_dropped_status=True)) + except Exception: + return + + LOG.debug("Task '%s' running on machine '%s' executor #%d", + self.token, task_manager.machine_id, os.getpid()) + + try: + self._implementation(task_manager) + LOG.debug("Task '%s' finished on machine '%s' executor #%d", + self.token, + task_manager.machine_id, + os.getpid()) + + try: + task_manager._mutate_task_record( + self, lambda dbt: dbt.set_finished(successfully=True)) + except Exception as ex: + LOG.error("Failed to set task '%s' finished due to " + "database exception:\n%s", + self.token, str(ex)) + except TaskCancelHonoured: + def _log_cancel_and_abandon(db_task: DBTask): + db_task.add_comment("CANCEL!\nCancel request of admin " + "honoured by task.", + "SYSTEM[AbstractTask::execute()]") + db_task.set_abandoned(force_dropped_status=False) + + def _log_drop_and_abandon(db_task: DBTask): + db_task.add_comment("SHUTDOWN!\nTask honoured graceful " + "cancel signal generated by " + "server shutdown.", + "SYSTEM[AbstractTask::execute()]") + db_task.set_abandoned(force_dropped_status=True) + + if not task_manager.is_shutting_down: + task_manager._mutate_task_record(self, _log_cancel_and_abandon) + else: + task_manager._mutate_task_record(self, _log_drop_and_abandon) + except Exception as ex: + LOG.error("Failed to execute task '%s' on machine '%s' " + "executor #%d: %s", + self.token, task_manager.machine_id, os.getpid(), + str(ex)) + import traceback + traceback.print_exc() + + def _log_exception_and_fail(db_task: DBTask): + db_task.add_comment( + f"FAILED!\nException during execution:\n{str(ex)}", + "SYSTEM[AbstractTask::execute()]") + db_task.set_finished(successfully=False) + + task_manager._mutate_task_record(self, _log_exception_and_fail) + finally: + self.destroy_data() diff --git a/web/server/codechecker_server/task_executors/main.py b/web/server/codechecker_server/task_executors/main.py new file mode 100644 index 0000000000..dc9dd4e543 --- /dev/null +++ b/web/server/codechecker_server/task_executors/main.py @@ -0,0 +1,142 @@ +# ------------------------------------------------------------------------- +# +# Part of the CodeChecker project, under the Apache License v2.0 with +# LLVM Exceptions. See LICENSE for license information. +# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +# +# ------------------------------------------------------------------------- +""" +Implements a dedicated subprocess that deals with running `AbstractTask` +subclasses in the background. +""" +from datetime import timedelta +import os +from queue import Empty +import signal + +from sqlalchemy.orm import sessionmaker + +from codechecker_common.compatibility.multiprocessing import Queue, Value +from codechecker_common.logger import get_logger, signal_log + +from ..database.config_db_model import BackgroundTask as DBTask +from .abstract_task import AbstractTask +from .task_manager import TaskManager + + +WAIT_TIME_FOR_TASK_QUEUE_CLEARING_AT_SERVER_SHUTDOWN = timedelta(seconds=5) + +LOG = get_logger("server") + + +def executor(queue: Queue, + config_db_sql_server, + server_shutdown_flag: "Value", + machine_id: str): + """ + The "main()" function implementation for a background task executor + process. + + This process sets up the state of the local process, and then deals with + popping jobs from the queue and executing them in the local context. + """ + # First things first, a background worker process should NOT respect the + # termination signals received from the parent process, because it has to + # run its own cleanup logic before shutting down. + signal.signal(signal.SIGINT, signal.SIG_IGN) + signal.signal(signal.SIGTERM, signal.SIG_IGN) + + kill_flag = Value('B', False) + + def executor_hangup_handler(signum: int, _frame): + """ + Handle SIGHUP (1) to do a graceful shutdown of the background worker. + """ + if signum not in [signal.SIGHUP]: + signal_log(LOG, "ERROR", "Signal " + f"<{signal.Signals(signum).name} ({signum})> " + "handling attempted by 'executor_hangup_handler'!") + return + + signal_log(LOG, "DEBUG", f"{os.getpid()}: Received " + f"{signal.Signals(signum).name} ({signum}), preparing for " + "shutdown ...") + kill_flag.value = True + + signal.signal(signal.SIGHUP, executor_hangup_handler) + + config_db_engine = config_db_sql_server.create_engine() + tm = TaskManager(queue, sessionmaker(bind=config_db_engine), kill_flag, + machine_id) + + while not kill_flag.value: + try: + # Do not block indefinitely when waiting for a job, to allow + # checking whether the kill flags were set. + t: AbstractTask = queue.get(block=True, timeout=1) + except Empty: + continue + + import pprint + LOG.info("Executor #%d received task object:\n\n%s:\n%s\n\n", + os.getpid(), t, pprint.pformat(t.__dict__)) + + t.execute(tm) + + # Once the main loop of task execution process has finished, there might + # still be tasks left in the queue. + # If the server is shutting down (this is distinguished from the local kill + # flag, because a 'SIGHUP' might arrive from any source, not just a valid + # graceful shutdown!), then these jobs would be lost if the process just + # exited, with no information reported to the database. + # We need set these tasks to dropped as much as possible. + def _log_shutdown_and_abandon(db_task: DBTask): + db_task.add_comment("SHUTDOWN!\nTask never started due to the " + "server shutdown!", "SYSTEM") + db_task.set_abandoned(force_dropped_status=True) + + def _drop_task_at_shutdown(t: AbstractTask): + try: + LOG.debug("Dropping task '%s' due to server shutdown...", t.token) + tm._mutate_task_record(t, _log_shutdown_and_abandon) + except Exception: + pass + finally: + t.destroy_data() + + if server_shutdown_flag.value: + # Unfortunately, it is not guaranteed which process will wake up first + # when popping objects from the queue. + # Blocking indefinitely would not be a solution here, because all + # producers (API threads) had likely already exited at this point. + # However, simply observing no elements for a short period of time is + # also not enough, as at the very last moments of a server's lifetime, + # one process might observe the queue to be empty, simply because + # another process stole the object that was put into it. + # + # To be on the safe side of things, we require to observe the queue to + # be *constantly* empty over a longer period of repetitive sampling. + empty_sample_count: int = 0 + while empty_sample_count < int( + WAIT_TIME_FOR_TASK_QUEUE_CLEARING_AT_SERVER_SHUTDOWN + .total_seconds()): + try: + t: AbstractTask = queue.get(block=True, timeout=1) + except Empty: + empty_sample_count += 1 + continue + + empty_sample_count = 0 + _drop_task_at_shutdown(t) + + queue.close() + queue.join_thread() + + try: + config_db_engine.dispose() + except Exception as ex: + LOG.error("Failed to shut down task executor!\n%s", str(ex)) + return + + LOG.debug("Task executor subprocess PID %d exited main loop.", + os.getpid()) diff --git a/web/server/codechecker_server/task_executors/task_manager.py b/web/server/codechecker_server/task_executors/task_manager.py new file mode 100644 index 0000000000..ddd3b31053 --- /dev/null +++ b/web/server/codechecker_server/task_executors/task_manager.py @@ -0,0 +1,229 @@ +# ------------------------------------------------------------------------- +# +# Part of the CodeChecker project, under the Apache License v2.0 with +# LLVM Exceptions. See LICENSE for license information. +# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +# +# ------------------------------------------------------------------------- +""" +Contains status management and query methods to handle bookkeeping for +dispatched background tasks. +""" +import os +from pathlib import Path +import tempfile +from typing import Callable, Optional + +import sqlalchemy + +from codechecker_common.compatibility.multiprocessing import Queue, Value +from codechecker_common.logger import get_logger, signal_log +from codechecker_common.util import generate_random_token + +from ..database.config_db_model import BackgroundTask as DBTask, Product +from ..database.database import DBSession + +MAX_TOKEN_RANDOM_RETRIES = 10 + +LOG = get_logger("server") + + +class ExecutorInProgressShutdownError(Exception): + """ + Exception raised to indicate that the background executors are under + shutdown. + """ + def __init__(self): + super().__init__("Task executor is shutting down!") + + +class TaskManager: + """ + Handles the creation of "Task" status objects in the database and pushing + in-memory `AbstractTask` subclass instances to a `Queue`. + + This class is instantiatied for EVERY WORKER separately, and is not a + shared resource! + """ + + def __init__(self, q: Queue, config_db_session_factory, + executor_kill_flag: Value, machine_id: str): + self._queue = q + self._database_factory = config_db_session_factory + self._is_shutting_down = executor_kill_flag + self._machine_id = machine_id + + @property + def machine_id(self) -> str: + """Returns the ``machine_id`` the instance was constructed with.""" + return self._machine_id + + def allocate_task_record(self, kind: str, summary: str, + user_name: Optional[str], + product: Optional[Product] = None) -> str: + """ + Creates the token and the status record for a new task with the given + initial metadata. + + Returns the token of the task, which is a unique identifier of the + allocated record. + """ + try_count: int = 0 + while True: + with DBSession(self._database_factory) as session: + try: + token = generate_random_token(DBTask._token_length) + + task = DBTask(token, kind, summary, self.machine_id, + user_name, product) + session.add(task) + session.commit() + + return token + except sqlalchemy.exc.IntegrityError as ie: + # The only failure that can happen is the PRIMARY KEY's + # UNIQUE violation, which means we hit jackpot by + # generating an already used token! + try_count += 1 + + if try_count >= MAX_TOKEN_RANDOM_RETRIES: + raise KeyError( + "Failed to generate a unique ID for task " + f"{kind} ({summary}) after " + f"{MAX_TOKEN_RANDOM_RETRIES} retries!") from ie + + def create_task_data(self, token: str) -> Path: + """ + Creates a temporary directory which is **NOT** cleaned up + automatically, and suitable for putting arbitrary files underneath + to communicate large inputs (that should not be put in the `Queue`) + to the `execute` method of an `AbstractTask`. + """ + task_tmp_root = Path(tempfile.gettempdir()) / "codechecker_tasks" \ + / self.machine_id + os.makedirs(task_tmp_root, exist_ok=True) + + task_tmp_dir = tempfile.mkdtemp(prefix=f"{token}-") + return Path(task_tmp_dir) + + def _get_task_record(self, task_obj: "AbstractTask") -> DBTask: + """ + Retrieves the `DBTask` for the task identified by `task_obj`. + + This class should not be mutated, only the fields queried. + """ + with DBSession(self._database_factory) as session: + try: + db_task = session.query(DBTask).get(task_obj.token) + session.expunge(db_task) + return db_task + except sqlalchemy.exc.SQLAlchemyError as sql_err: + raise KeyError(f"No task record for token '{task_obj.token}' " + "in the database") from sql_err + + def _mutate_task_record(self, task_obj: "AbstractTask", + mutator: Callable[[DBTask], None]): + """ + Executes the given `mutator` function for the `DBTask` record + corresponding to the `task_obj` description available in memory. + """ + with DBSession(self._database_factory) as session: + try: + db_record = session.query(DBTask).get(task_obj.token) + except sqlalchemy.exc.SQLAlchemyError as sql_err: + raise KeyError(f"No task record for token '{task_obj.token}' " + "in the database") from sql_err + + try: + mutator(db_record) + except Exception: + session.rollback() + + import traceback + traceback.print_exc() + raise + + session.commit() + + def push_task(self, task_obj: "AbstractTask"): + """Enqueues the given `task_obj` onto the `Queue`.""" + if self.is_shutting_down: + raise ExecutorInProgressShutdownError() + + # Note, that the API handler process calling push_task() might be + # killed before writing to the queue, so an actually enqueued task + # (according to the DB) might never be consumed by a background + # process. + # As we have to COMMIT the status change before the actual processing + # in order to show the time stamp to the user(s), there is no better + # way to make this more atomic. + try: + self._mutate_task_record(task_obj, lambda dbt: dbt.set_enqueued()) + self._queue.put(task_obj) + except SystemExit as sex: + try: + signal_log(LOG, "WARNING", f"Process #{os.getpid()}: " + "push_task() killed via SystemExit during " + f"enqueue of task '{task_obj.token}'!") + + def _log_and_abandon(db_task: DBTask): + db_task.add_comment( + "SHUTDOWN!\nEnqueueing process terminated during the " + "ongoing enqueue! The task will never be executed!", + "SYSTEM[TaskManager::push_task()]") + db_task.set_abandoned(force_dropped_status=True) + + self._mutate_task_record(task_obj, _log_and_abandon) + finally: + raise sex + + @property + def is_shutting_down(self) -> bool: + """ + Returns whether the shutdown flag for the executor associated with the + `TaskManager` had been set. + """ + return self._is_shutting_down.value + + def should_cancel(self, task_obj: "AbstractTask") -> bool: + """ + Returns whether the task identified by `task_obj` should be + co-operatively cancelled. + """ + db_task = self._get_task_record(task_obj) + return self.is_shutting_down or \ + (db_task.status in ["enqueued", "running"] + and db_task.cancel_flag) + + def heartbeat(self, task_obj: "AbstractTask"): + """ + Triggers ``heartbeat()`` timestamp update in the database for + `task_obj`. + """ + self._mutate_task_record(task_obj, lambda dbt: dbt.heartbeat()) + + +def drop_all_incomplete_tasks(config_db_session_factory, machine_id: str, + action: str) -> int: + """ + Sets all tasks in the database (reachable via `config_db_session_factory`) + that were associated with the given `machine_id` to ``"dropped"`` status, + indicating that the status was changed during the `action`. + + Returns the number of `DBTask`s actually changed. + """ + count: int = 0 + with DBSession(config_db_session_factory) as session: + for t in session.query(DBTask) \ + .filter(DBTask.machine_id == machine_id, + DBTask.status.in_(["allocated", + "enqueued", + "running"])) \ + .all(): + count += 1 + t.add_comment(f"DROPPED!\n{action}", + "SYSTEM") + t.set_abandoned(force_dropped_status=True) + + session.commit() + return count diff --git a/web/server/codechecker_server/tmp.py b/web/server/codechecker_server/tmp.py deleted file mode 100644 index bbc5e77bea..0000000000 --- a/web/server/codechecker_server/tmp.py +++ /dev/null @@ -1,37 +0,0 @@ -# ------------------------------------------------------------------------- -# -# Part of the CodeChecker project, under the Apache License v2.0 with -# LLVM Exceptions. See LICENSE for license information. -# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -# -# ------------------------------------------------------------------------- -""" -Temporary directory module. -""" - - -import datetime -import hashlib -import os - - -from codechecker_common.logger import get_logger - -LOG = get_logger('system') - - -def get_tmp_dir_hash(): - """Generate a hash based on the current time and process id.""" - - pid = os.getpid() - time = datetime.datetime.now() - - data = str(pid) + str(time) - - dir_hash = hashlib.md5() - dir_hash.update(data.encode("utf-8")) - - LOG.debug('The generated temporary directory hash is %s.', - dir_hash.hexdigest()) - - return dir_hash.hexdigest() diff --git a/web/server/config/server_config.json b/web/server/config/server_config.json index e42745f08d..a5ad4999c8 100644 --- a/web/server/config/server_config.json +++ b/web/server/config/server_config.json @@ -1,4 +1,6 @@ { + "background_worker_processes": null, + "worker_processes": null, "max_run_count": null, "store": { "analysis_statistics_dir": null, diff --git a/web/server/vue-cli/package-lock.json b/web/server/vue-cli/package-lock.json index d908b8c278..d0943fd772 100644 --- a/web/server/vue-cli/package-lock.json +++ b/web/server/vue-cli/package-lock.json @@ -11,7 +11,7 @@ "@mdi/font": "^6.5.95", "chart.js": "^2.9.4", "chartjs-plugin-datalabels": "^0.7.0", - "codechecker-api": "file:../../api/js/codechecker-api-node/dist/codechecker-api-6.58.0.tgz", + "codechecker-api": "file:../../api/js/codechecker-api-node/dist/codechecker-api-6.59.0.tgz", "codemirror": "^5.65.0", "date-fns": "^2.28.0", "js-cookie": "^3.0.1", @@ -5113,9 +5113,9 @@ } }, "node_modules/codechecker-api": { - "version": "6.58.0", - "resolved": "file:../../api/js/codechecker-api-node/dist/codechecker-api-6.58.0.tgz", - "integrity": "sha512-N6qK5cnLt32jnJlSyyGMmW6FCzybDljyH1RrGOZ1Gk9n1vV7WluJbC9InYWsZ5lbK7xVyIrphTKXhqC4ARKF6g==", + "version": "6.59.0", + "resolved": "file:../../api/js/codechecker-api-node/dist/codechecker-api-6.59.0.tgz", + "integrity": "sha512-BZCBDRjVFS5UerrXsoPNioQppTfrCdDgToHqfFfaQtk6FPVrER42LchfU+cZl254PgWh58H5bLfqdLyFfqntCg==", "license": "SEE LICENSE IN LICENSE", "dependencies": { "thrift": "0.13.0-hotfix.1" @@ -21145,8 +21145,8 @@ "dev": true }, "codechecker-api": { - "version": "file:../../api/js/codechecker-api-node/dist/codechecker-api-6.58.0.tgz", - "integrity": "sha512-N6qK5cnLt32jnJlSyyGMmW6FCzybDljyH1RrGOZ1Gk9n1vV7WluJbC9InYWsZ5lbK7xVyIrphTKXhqC4ARKF6g==", + "version": "file:../../api/js/codechecker-api-node/dist/codechecker-api-6.59.0.tgz", + "integrity": "sha512-BZCBDRjVFS5UerrXsoPNioQppTfrCdDgToHqfFfaQtk6FPVrER42LchfU+cZl254PgWh58H5bLfqdLyFfqntCg==", "requires": { "thrift": "0.13.0-hotfix.1" } diff --git a/web/server/vue-cli/package.json b/web/server/vue-cli/package.json index 2239777668..f31789b897 100644 --- a/web/server/vue-cli/package.json +++ b/web/server/vue-cli/package.json @@ -27,7 +27,7 @@ }, "dependencies": { "@mdi/font": "^6.5.95", - "codechecker-api": "file:../../api/js/codechecker-api-node/dist/codechecker-api-6.58.0.tgz", + "codechecker-api": "file:../../api/js/codechecker-api-node/dist/codechecker-api-6.59.0.tgz", "chart.js": "^2.9.4", "chartjs-plugin-datalabels": "^0.7.0", "codemirror": "^5.65.0", diff --git a/web/tests/functional/instance_manager/test_instances.py b/web/tests/functional/instance_manager/test_instances.py index 0e7fc3a1d6..0851548100 100644 --- a/web/tests/functional/instance_manager/test_instances.py +++ b/web/tests/functional/instance_manager/test_instances.py @@ -10,7 +10,6 @@ Instance manager tests. """ - import os import shutil import subprocess @@ -178,7 +177,7 @@ def test_shutdown_record_keeping(self): EVENT_2.set() # Give the server some grace period to react to the kill command. - time.sleep(5) + time.sleep(30) test_cfg = env.import_test_cfg(self._test_workspace) codechecker_1 = test_cfg['codechecker_1'] diff --git a/web/tests/functional/tasks/__init__.py b/web/tests/functional/tasks/__init__.py new file mode 100644 index 0000000000..4259749345 --- /dev/null +++ b/web/tests/functional/tasks/__init__.py @@ -0,0 +1,7 @@ +# ------------------------------------------------------------------------- +# +# Part of the CodeChecker project, under the Apache License v2.0 with +# LLVM Exceptions. See LICENSE for license information. +# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +# +# ------------------------------------------------------------------------- diff --git a/web/tests/functional/tasks/test_task_management.py b/web/tests/functional/tasks/test_task_management.py new file mode 100644 index 0000000000..a53a1e1b4f --- /dev/null +++ b/web/tests/functional/tasks/test_task_management.py @@ -0,0 +1,494 @@ +# ------------------------------------------------------------------------- +# +# Part of the CodeChecker project, under the Apache License v2.0 with +# LLVM Exceptions. See LICENSE for license information. +# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +# +# ------------------------------------------------------------------------- +""" +Contains tests of the ``"/Tasks"`` API endpoint to query, using the +``DummyTask``, normal task management related API functions. +""" +from copy import deepcopy +from datetime import datetime, timezone +import os +import pathlib +import shutil +import unittest +import time +from typing import List, Optional, cast + +import multiprocess + +from codechecker_api_shared.ttypes import RequestFailed, Ternary +from codechecker_api.codeCheckerServersideTasks_v6.ttypes import \ + AdministratorTaskInfo, TaskFilter, TaskInfo, TaskStatus + +from libtest import codechecker, env + + +# Stop events for the CodeChecker servers. +STOP_SERVER = multiprocess.Event() +STOP_SERVER_AUTH = multiprocess.Event() +STOP_SERVER_NO_AUTH = multiprocess.Event() + +TEST_WORKSPACE: Optional[str] = None + + +# Note: Test names in this file follow a strict ordinal convention, because +# the assertions are created with a specific execution history! + +class TaskManagementAPITests(unittest.TestCase): + def setup_class(self): + global TEST_WORKSPACE + TEST_WORKSPACE = env.get_workspace("tasks") + os.environ["TEST_WORKSPACE"] = TEST_WORKSPACE + + codechecker_cfg = { + "check_env": env.test_env(TEST_WORKSPACE), + "workspace": TEST_WORKSPACE, + "checkers": [], + "viewer_host": "localhost", + "viewer_port": env.get_free_port(), + "viewer_product": "tasks", + } + + # Run a normal server that is only used to manage the + # "test_package_product". + codechecker.start_server(codechecker_cfg, STOP_SERVER, + ["--machine-id", "workspace-manager"]) + + codechecker_cfg_no_auth = deepcopy(codechecker_cfg) + codechecker_cfg_no_auth.update({ + "viewer_port": env.get_free_port(), + }) + + # Run a normal server which does not require authentication. + codechecker.start_server(codechecker_cfg_no_auth, STOP_SERVER_NO_AUTH, + ["--machine-id", "unprivileged"]) + + codechecker_cfg_auth = deepcopy(codechecker_cfg) + codechecker_cfg_auth.update({ + "viewer_port": env.get_free_port(), + }) + + # Run a privileged server which does require authentication. + (pathlib.Path(TEST_WORKSPACE) / "root.user").unlink() + env.enable_auth(TEST_WORKSPACE) + codechecker.start_server(codechecker_cfg_auth, STOP_SERVER_AUTH, + ["--machine-id", "privileged"]) + + env.export_test_cfg(TEST_WORKSPACE, + {"codechecker_cfg": codechecker_cfg, + "codechecker_cfg_no_auth": + codechecker_cfg_no_auth, + "codechecker_cfg_auth": codechecker_cfg_auth}) + + codechecker.add_test_package_product(codechecker_cfg, TEST_WORKSPACE) + + def teardown_class(self): + # TODO: If environment variable is set keep the workspace and print + # out the path. + global TEST_WORKSPACE + + STOP_SERVER_NO_AUTH.set() + STOP_SERVER_NO_AUTH.clear() + STOP_SERVER_AUTH.set() + STOP_SERVER_AUTH.clear() + + codechecker.remove_test_package_product(TEST_WORKSPACE) + STOP_SERVER.set() + STOP_SERVER.clear() + + print(f"Removing: {TEST_WORKSPACE}") + shutil.rmtree(cast(str, TEST_WORKSPACE), ignore_errors=True) + + def setup_method(self, _): + test_workspace = os.environ["TEST_WORKSPACE"] + self._test_env = env.import_test_cfg(test_workspace) + + print(f"Running {self.__class__.__name__} tests in {test_workspace}") + + auth_server = self._test_env["codechecker_cfg_auth"] + no_auth_server = self._test_env["codechecker_cfg_no_auth"] + + self._auth_client = env.setup_auth_client(test_workspace, + auth_server["viewer_host"], + auth_server["viewer_port"]) + + root_token = self._auth_client.performLogin("Username:Password", + "root:root") + admin_token = self._auth_client.performLogin("Username:Password", + "admin:admin123") + + self._anonymous_task_client = env.setup_task_client( + test_workspace, + no_auth_server["viewer_host"], no_auth_server["viewer_port"]) + self._admin_task_client = env.setup_task_client( + test_workspace, + auth_server["viewer_host"], auth_server["viewer_port"], + session_token=admin_token) + self._privileged_task_client = env.setup_task_client( + test_workspace, + auth_server["viewer_host"], auth_server["viewer_port"], + session_token=root_token) + + def test_task_1_query_status(self): + task_token = self._anonymous_task_client.createDummyTask(10, False) + + time.sleep(5) + task_info: TaskInfo = self._anonymous_task_client.getTaskInfo( + task_token) + self.assertEqual(task_info.token, task_token) + self.assertEqual(task_info.status, + TaskStatus._NAMES_TO_VALUES["RUNNING"]) + self.assertEqual(task_info.productId, 0) + self.assertIsNone(task_info.actorUsername) + self.assertIn("Dummy task", task_info.summary) + self.assertEqual(task_info.cancelFlagSet, False) + + time.sleep(10) # A bit more than exactly what remains of 10 seconds! + task_info = self._anonymous_task_client.getTaskInfo(task_token) + self.assertEqual(task_info.status, + TaskStatus._NAMES_TO_VALUES["COMPLETED"]) + self.assertEqual(task_info.cancelFlagSet, False) + self.assertIsNotNone(task_info.enqueuedAtEpoch) + self.assertIsNotNone(task_info.startedAtEpoch) + self.assertLessEqual(task_info.enqueuedAtEpoch, + task_info.startedAtEpoch) + self.assertIsNotNone(task_info.completedAtEpoch) + self.assertLess(task_info.startedAtEpoch, task_info.completedAtEpoch) + self.assertEqual(task_info.cancelFlagSet, False) + + def test_task_2_query_status_of_failed(self): + task_token = self._anonymous_task_client.createDummyTask(10, True) + + time.sleep(5) + task_info: TaskInfo = self._anonymous_task_client.getTaskInfo( + task_token) + self.assertEqual(task_info.token, task_token) + self.assertEqual(task_info.status, + TaskStatus._NAMES_TO_VALUES["RUNNING"]) + self.assertEqual(task_info.cancelFlagSet, False) + + time.sleep(10) # A bit more than exactly what remains of 10 seconds! + task_info = self._anonymous_task_client.getTaskInfo(task_token) + self.assertEqual(task_info.status, + TaskStatus._NAMES_TO_VALUES["FAILED"]) + self.assertEqual(task_info.cancelFlagSet, False) + + def test_task_3_cancel(self): + task_token = self._anonymous_task_client.createDummyTask(10, False) + + time.sleep(3) + cancel_req: bool = self._privileged_task_client.cancelTask(task_token) + self.assertTrue(cancel_req) + + time.sleep(3) + cancel_req_2: bool = self._privileged_task_client.cancelTask( + task_token) + # The task was already cancelled, so cancel_req_2 is not the API call + # that cancelled the task. + self.assertFalse(cancel_req_2) + + time.sleep(5) # A bit more than exactly what remains of 10 seconds! + task_info: TaskInfo = self._anonymous_task_client.getTaskInfo( + task_token) + self.assertEqual(task_info.status, + TaskStatus._NAMES_TO_VALUES["CANCELLED"]) + self.assertEqual(task_info.cancelFlagSet, True) + self.assertIn("root", task_info.comments) + self.assertIn("SUPERUSER requested cancellation.", task_info.comments) + self.assertIn("CANCEL!\nCancel request of admin honoured by task.", + task_info.comments) + self.assertIsNotNone(task_info.enqueuedAtEpoch) + self.assertIsNotNone(task_info.startedAtEpoch) + self.assertLessEqual(task_info.enqueuedAtEpoch, + task_info.startedAtEpoch) + self.assertIsNotNone(task_info.completedAtEpoch) + self.assertLess(task_info.startedAtEpoch, task_info.completedAtEpoch) + + def test_task_4_get_tasks_as_admin(self): + with self.assertRaises(RequestFailed): + self._admin_task_client.getTasks(TaskFilter( + # No SUPERUSER rights of test admin. + filterForNoProductID=True + )) + with self.assertRaises(RequestFailed): + self._admin_task_client.getTasks(TaskFilter( + # Default product, no PRODUCT_ADMIN rights of test admin. + productIDs=[1] + )) + with self.assertRaises(RequestFailed): + self._privileged_task_client.getTasks(TaskFilter( + productIDs=[1], + filterForNoProductID=True + )) + with self.assertRaises(RequestFailed): + self._privileged_task_client.getTasks(TaskFilter( + usernames=["foo", "bar"], + filterForNoUsername=True + )) + + # PRODUCT_ADMIN rights on test-specific product... + task_infos: List[AdministratorTaskInfo] = \ + self._admin_task_client.getTasks(TaskFilter(productIDs=[2])) + # ... but no product-specific tasks exist in this test suite. + self.assertEqual(len(task_infos), 0) + + task_infos = self._privileged_task_client.getTasks(TaskFilter()) + self.assertEqual(len(task_infos), 3) + + self.assertEqual(sum(1 for t in task_infos + if t.normalInfo.status == + TaskStatus._NAMES_TO_VALUES["COMPLETED"]), 1) + self.assertEqual(sum(1 for t in task_infos + if t.normalInfo.status == + TaskStatus._NAMES_TO_VALUES["FAILED"]), 1) + self.assertEqual(sum(1 for t in task_infos + if t.normalInfo.status == + TaskStatus._NAMES_TO_VALUES["CANCELLED"]), 1) + + def test_task_5_info_query_filters(self): + current_time_epoch = int(datetime.now(timezone.utc).timestamp()) + + task_infos: List[AdministratorTaskInfo] = \ + self._privileged_task_client.getTasks(TaskFilter( + machineIDs=["nonexistent"] + )) + self.assertEqual(len(task_infos), 0) + + task_infos = self._privileged_task_client.getTasks(TaskFilter( + machineIDs=["unprivileged"] + )) + self.assertEqual(len(task_infos), 3) + + tokens_from_previous_test = [t.normalInfo.token for t in task_infos] + + task_infos = self._admin_task_client.getTasks(TaskFilter( + tokens=tokens_from_previous_test + )) + # Admin client is not a SUPERUSER, it should not get the list of + # tasks visible only to superusers because they are "server-level". + self.assertEqual(len(task_infos), 0) + + task_infos = self._privileged_task_client.getTasks(TaskFilter( + machineIDs=["privileged"] + )) + self.assertEqual(len(task_infos), 0) + + task_infos = self._privileged_task_client.getTasks(TaskFilter( + startedBeforeEpoch=current_time_epoch + )) + self.assertEqual(len(task_infos), 3) + + task_infos = self._privileged_task_client.getTasks(TaskFilter( + startedAfterEpoch=current_time_epoch + )) + self.assertEqual(len(task_infos), 0) + + task_infos = self._privileged_task_client.getTasks(TaskFilter( + cancelFlag=Ternary._NAMES_TO_VALUES["ON"] + )) + self.assertEqual(len(task_infos), 1) + + task_infos = self._privileged_task_client.getTasks(TaskFilter( + cancelFlag=Ternary._NAMES_TO_VALUES["OFF"] + )) + self.assertEqual(len(task_infos), 2) + + task_infos = self._privileged_task_client.getTasks(TaskFilter( + consumedFlag=Ternary._NAMES_TO_VALUES["ON"] + )) + self.assertEqual(len(task_infos), 3) + + task_infos = self._privileged_task_client.getTasks(TaskFilter( + consumedFlag=Ternary._NAMES_TO_VALUES["OFF"] + )) + self.assertEqual(len(task_infos), 0) + + task_infos = self._privileged_task_client.getTasks(TaskFilter()) + + current_time_epoch = int(datetime.now(timezone.utc).timestamp()) + for i in range(10): + target_api = self._anonymous_task_client if i % 2 == 0 \ + else self._admin_task_client + for j in range(10): + target_api.createDummyTask(1, bool(j % 2 == 0)) + + task_infos = self._privileged_task_client.getTasks(TaskFilter()) + self.assertEqual(len(task_infos), 103) + + task_infos = self._privileged_task_client.getTasks(TaskFilter( + enqueuedAfterEpoch=current_time_epoch, + )) + self.assertEqual(len(task_infos), 100) + + task_infos = self._privileged_task_client.getTasks(TaskFilter( + enqueuedAfterEpoch=current_time_epoch, + machineIDs=["unprivileged"] + )) + self.assertEqual(len(task_infos), 50) + + task_infos = self._privileged_task_client.getTasks(TaskFilter( + enqueuedAfterEpoch=current_time_epoch, + machineIDs=["privileged"] + )) + self.assertEqual(len(task_infos), 50) + + task_infos = self._privileged_task_client.getTasks(TaskFilter( + enqueuedAfterEpoch=current_time_epoch, + filterForNoUsername=True, + )) + self.assertEqual(len(task_infos), 50) + + task_infos = self._privileged_task_client.getTasks(TaskFilter( + enqueuedAfterEpoch=current_time_epoch, + usernames=["admin"], + )) + self.assertEqual(len(task_infos), 50) + + task_infos = self._privileged_task_client.getTasks(TaskFilter( + enqueuedAfterEpoch=current_time_epoch, + usernames=["root"], + )) + self.assertEqual(len(task_infos), 0) + + task_infos = self._privileged_task_client.getTasks(TaskFilter( + enqueuedAfterEpoch=current_time_epoch, + startedAfterEpoch=current_time_epoch + )) + # Some tasks ought to have started at least. + self.assertGreater(len(task_infos), 0) + + task_infos = self._privileged_task_client.getTasks(TaskFilter( + enqueuedAfterEpoch=current_time_epoch, + startedAfterEpoch=current_time_epoch, + completedAfterEpoch=current_time_epoch + )) + # Some tasks ought to have also finished at least. + self.assertGreater(len(task_infos), 0) + + # Let every task terminate. We should only need 1 second per task, + # running likely in a multithreaded environment. + # Let's have some leeway, though... + time.sleep(2 * (100 * 1 // cast(int, os.cpu_count()))) + + task_infos = self._privileged_task_client.getTasks(TaskFilter( + enqueuedAfterEpoch=current_time_epoch, + startedAfterEpoch=current_time_epoch, + completedAfterEpoch=current_time_epoch + )) + # All tasks should have finished. + self.assertEqual(len(task_infos), 100) + + task_infos = self._privileged_task_client.getTasks(TaskFilter( + enqueuedAfterEpoch=current_time_epoch, + startedAfterEpoch=current_time_epoch, + completedAfterEpoch=current_time_epoch, + statuses=[TaskStatus._NAMES_TO_VALUES["COMPLETED"]] + )) + self.assertEqual(len(task_infos), 50) + + task_infos = self._privileged_task_client.getTasks(TaskFilter( + enqueuedAfterEpoch=current_time_epoch, + startedAfterEpoch=current_time_epoch, + completedAfterEpoch=current_time_epoch, + statuses=[TaskStatus._NAMES_TO_VALUES["FAILED"]] + )) + self.assertEqual(len(task_infos), 50) + + task_infos = self._privileged_task_client.getTasks(TaskFilter( + enqueuedAfterEpoch=current_time_epoch, + startedAfterEpoch=current_time_epoch, + completedAfterEpoch=current_time_epoch, + cancelFlag=Ternary._NAMES_TO_VALUES["ON"] + )) + self.assertEqual(len(task_infos), 0) + + task_infos = self._privileged_task_client.getTasks(TaskFilter( + enqueuedAfterEpoch=current_time_epoch, + startedAfterEpoch=current_time_epoch, + completedAfterEpoch=current_time_epoch, + consumedFlag=Ternary._NAMES_TO_VALUES["ON"] + )) + self.assertEqual(len(task_infos), 0) + + task_infos = self._privileged_task_client.getTasks(TaskFilter( + machineIDs=["*privileged"] + )) + self.assertEqual(len(task_infos), 103) + + task_infos = self._privileged_task_client.getTasks(TaskFilter( + kinds=["*Dummy*"] + )) + self.assertEqual(len(task_infos), 103) + + # Try to consume the task status from the wrong user! + task_infos = self._privileged_task_client.getTasks(TaskFilter( + enqueuedAfterEpoch=current_time_epoch, + startedAfterEpoch=current_time_epoch, + completedAfterEpoch=current_time_epoch, + filterForNoUsername=True, + statuses=[TaskStatus._NAMES_TO_VALUES["COMPLETED"]] + )) + self.assertEqual(len(task_infos), 25) + a_token: str = task_infos[0].normalInfo.token + with self.assertRaises(RequestFailed): + self._admin_task_client.getTaskInfo(a_token) + + task_infos = self._privileged_task_client.getTasks(TaskFilter( + machineIDs=["workspace-manager"] + )) + self.assertEqual(len(task_infos), 0) + + def test_task_6_dropping(self): + current_time_epoch = int(datetime.now(timezone.utc).timestamp()) + many_task_count = 4 * cast(int, os.cpu_count()) + for _ in range(many_task_count): + self._anonymous_task_client.createDummyTask(600, False) + + STOP_SERVER_NO_AUTH.set() + time.sleep(30) + STOP_SERVER_NO_AUTH.clear() + after_shutdown_time_epoch = int(datetime.now(timezone.utc) + .timestamp()) + + task_infos: List[AdministratorTaskInfo] = \ + self._privileged_task_client.getTasks(TaskFilter( + enqueuedAfterEpoch=current_time_epoch, + statuses=[ + TaskStatus._NAMES_TO_VALUES["ENQUEUED"], + TaskStatus._NAMES_TO_VALUES["RUNNING"], + TaskStatus._NAMES_TO_VALUES["COMPLETED"], + TaskStatus._NAMES_TO_VALUES["FAILED"], + TaskStatus._NAMES_TO_VALUES["CANCELLED"] + ] + )) + self.assertEqual(len(task_infos), 0) + + task_infos = self._privileged_task_client.getTasks(TaskFilter( + enqueuedAfterEpoch=current_time_epoch, + statuses=[TaskStatus._NAMES_TO_VALUES["DROPPED"]], + # System-level dropping is not a "cancellation" action! + cancelFlag=Ternary._NAMES_TO_VALUES["OFF"] + )) + self.assertEqual(len(task_infos), many_task_count) + dropped_task_infos = {ti.normalInfo.token: ti for ti in task_infos} + + # Some tasks will have started, and the server pulled out from under. + task_infos = self._privileged_task_client.getTasks(TaskFilter( + enqueuedAfterEpoch=current_time_epoch, + startedBeforeEpoch=after_shutdown_time_epoch, + statuses=[TaskStatus._NAMES_TO_VALUES["DROPPED"]] + )) + for ti in task_infos: + self.assertIn("SHUTDOWN!\nTask honoured graceful cancel signal " + "generated by server shutdown.", + ti.normalInfo.comments) + del dropped_task_infos[ti.normalInfo.token] + + # The rest could have never started. + for ti in dropped_task_infos.values(): + self.assertTrue("DROPPED!\n" in ti.normalInfo.comments or + "SHUTDOWN!\n" in ti.normalInfo.comments) diff --git a/web/tests/libtest/env.py b/web/tests/libtest/env.py index 1610db8bef..f89e495992 100644 --- a/web/tests/libtest/env.py +++ b/web/tests/libtest/env.py @@ -18,16 +18,18 @@ import socket import stat import subprocess +from typing import cast from codechecker_common.util import load_json -from .thrift_client_to_db import get_auth_client -from .thrift_client_to_db import get_config_client -from .thrift_client_to_db import get_product_client -from .thrift_client_to_db import get_viewer_client +from .thrift_client_to_db import \ + get_auth_client, \ + get_config_client, \ + get_product_client, \ + get_task_client, \ + get_viewer_client -from functional import PKG_ROOT -from functional import REPO_ROOT +from functional import PKG_ROOT, REPO_ROOT def get_free_port(): @@ -236,6 +238,30 @@ def setup_config_client(workspace, session_token=session_token, protocol=proto) +def setup_task_client(workspace, + host=None, port=None, + uri="/Tasks", + auto_handle_connection=True, + session_token=None, + protocol="http"): + if not host and not port: + codechecker_cfg = import_test_cfg(workspace)["codechecker_cfg"] + port = codechecker_cfg["viewer_port"] + host = codechecker_cfg["viewer_host"] + + if session_token is None: + session_token = get_session_token(workspace, host, port) + if session_token == "_PROHIBIT": + session_token = None + + return get_task_client(port=port, + host=cast(str, host), + uri=uri, + auto_handle_connection=auto_handle_connection, + session_token=session_token, + protocol=protocol) + + def repository_root(): return os.path.abspath(os.environ['REPO_ROOT']) diff --git a/web/tests/libtest/thrift_client_to_db.py b/web/tests/libtest/thrift_client_to_db.py index de7788c929..2b5c5a11e8 100644 --- a/web/tests/libtest/thrift_client_to_db.py +++ b/web/tests/libtest/thrift_client_to_db.py @@ -238,6 +238,26 @@ def __getattr__(self, attr): return partial(self._thrift_client_call, attr) +class CCTaskHelper(ThriftAPIHelper): + def __init__(self, proto, host, port, uri, auto_handle_connection=True, + session_token=None): + from codechecker_api.codeCheckerServersideTasks_v6 \ + import codeCheckerServersideTaskService + from codechecker_client.credential_manager import SESSION_COOKIE_NAME + + url = create_product_url(proto, host, port, f"/v{VERSION}{uri}") + transport = THttpClient.THttpClient(url) + protocol = TJSONProtocol.TJSONProtocol(transport) + client = codeCheckerServersideTaskService.Client(protocol) + if session_token: + headers = {'Cookie': f"{SESSION_COOKIE_NAME}={session_token}"} + transport.setCustomHeaders(headers) + super().__init__(transport, client, auto_handle_connection) + + def __getattr__(self, attr): + return partial(self._thrift_client_call, attr) + + def get_all_run_results( client, run_id=None, @@ -303,3 +323,10 @@ def get_config_client(port, host='localhost', uri='/Configuration', return CCConfigHelper(protocol, host, port, uri, auto_handle_connection, session_token) + + +def get_task_client(port, host="localhost", uri="/Tasks", + auto_handle_connection=True, session_token=None, + protocol="http"): + return CCTaskHelper(protocol, host, port, uri, auto_handle_connection, + session_token)