From f619e7d2a770f2f300ec6c7778b70e9f4f12cb4a Mon Sep 17 00:00:00 2001 From: Joe Lynch Date: Mon, 9 Oct 2023 11:24:13 +0200 Subject: [PATCH] Use Rohmu models directly --- .gitignore | 2 + astacus/common/rohmustorage.py | 99 ++----------------- .../clickhouse/async_object_storage.py | 17 ++-- .../coordinator/plugins/clickhouse/disks.py | 2 +- setup.cfg | 2 +- .../plugins/clickhouse/conftest.py | 33 +++---- tests/unit/common/test_storage.py | 4 +- .../plugins/clickhouse/test_dependencies.py | 38 +++---- .../plugins/clickhouse/test_disks.py | 36 ++++--- tests/utils.py | 2 +- 10 files changed, 80 insertions(+), 155 deletions(-) diff --git a/.gitignore b/.gitignore index cb3d6ad2..8e16db5b 100644 --- a/.gitignore +++ b/.gitignore @@ -144,3 +144,5 @@ astacus/proto/*_pb2.py **/py.typed .idea + +.vscode/ diff --git a/astacus/common/rohmustorage.py b/astacus/common/rohmustorage.py index 04b73505..41f61562 100644 --- a/astacus/common/rohmustorage.py +++ b/astacus/common/rohmustorage.py @@ -6,7 +6,6 @@ Rohmu-specific actual object storage implementation """ -from .magic import StrEnum from .storage import Json, MultiStorage, Storage, StorageUploadResult from .utils import AstacusModel from astacus.common import exceptions @@ -16,7 +15,6 @@ from rohmu.compressor import CompressionStream from rohmu.encryptor import EncryptorStream from typing import BinaryIO, Dict, Optional, Union -from typing_extensions import Literal import io import json @@ -28,17 +26,6 @@ logger = logging.getLogger(__name__) -class RohmuStorageType(StrEnum): - """Embodies what is detected in rohmu.get_class_for_transfer""" - - azure = "azure" - google = "google" - local = "local" - s3 = "s3" - # sftp = "sftp" - # swift = "swift" - - class RohmuModel(AstacusModel): class Config: # As we're keen to both export and decode json, just using enum @@ -46,84 +33,12 @@ class Config: use_enum_values = True -class RohmuProxyType(StrEnum): - socks5 = "socks5" - http = "http" - - -class RohmuProxyInfo(RohmuModel): - type: RohmuProxyType - host: str - port: int - user: Optional[str] = None - password: Optional[str] = Field(None, alias="pass") - - -class StatsdInfo(RohmuModel): - host: str - port: int - tags: dict[str, str | int | None] - - -class ObjectStorageNotifier(RohmuModel): - notifier_type: str - url: str - - -class ObjectStorageConfig(RohmuModel): - storage_type: RohmuStorageType - prefix: Optional[str] = None - notifier: Optional[ObjectStorageNotifier] = None - statsd_info: Optional[StatsdInfo] = None - - -class RohmuProxyStorage(ObjectStorageConfig): - """Storage backend with support for optional socks5 or http proxy connections""" - - proxy_info: Optional[RohmuProxyInfo] = None - - -class RohmuLocalStorageConfig(ObjectStorageConfig): - storage_type: Literal[RohmuStorageType.local] - directory: str - prefix: Optional[str] = None - - -class RohmuS3StorageConfig(RohmuProxyStorage): - storage_type: Literal[RohmuStorageType.s3] - region: str - bucket_name: str - aws_access_key_id: Optional[str] = None - aws_secret_access_key: Optional[str] = None - host: Optional[str] = None - port: Optional[int] = None - is_secure: Optional[bool] = False - is_verify_tls: Optional[bool] = False - prefix: Optional[str] = None - # Some more obscure options with defaults are omitted - - -class RohmuAzureStorageConfig(RohmuProxyStorage): - storage_type: Literal[RohmuStorageType.azure] - account_name: str - bucket_name: str - account_key: Optional[str] = None - sas_token: Optional[str] = None - prefix: Optional[str] = None - azure_cloud: Optional[str] = None - - -class RohmuGoogleStorageConfig(RohmuProxyStorage): - storage_type: Literal[RohmuStorageType.google] - # rohmu.object_storage.google:GoogleTransfer.__init__ arguments - project_id: str - bucket_name: str - credentials: Dict[str, str] - prefix: Optional[str] = None - # credential_file # omitted, n/a - - -RohmuStorageConfig = Union[RohmuLocalStorageConfig, RohmuS3StorageConfig, RohmuAzureStorageConfig, RohmuGoogleStorageConfig] +RohmuStorageConfig = Union[ + rohmu.LocalObjectStorageConfig, + rohmu.S3ObjectStorageConfig, + rohmu.AzureObjectStorageConfig, + rohmu.GoogleObjectStorageConfig, +] class RohmuEncryptionKey(RohmuModel): @@ -240,7 +155,7 @@ def _choose_storage(self, storage: str | None = None) -> None: storage = self.config.default_storage self.storage_name = storage self.storage_config = self.config.storages[storage] - self.storage = rohmu.get_transfer(self.storage_config.dict(by_alias=True, exclude_unset=True)) + self.storage = rohmu.get_transfer_from_model(self.storage_config) def copy(self) -> "RohmuStorage": return RohmuStorage(config=self.config, storage=self.storage_name) diff --git a/astacus/coordinator/plugins/clickhouse/async_object_storage.py b/astacus/coordinator/plugins/clickhouse/async_object_storage.py index 9d4688dd..4da737bf 100644 --- a/astacus/coordinator/plugins/clickhouse/async_object_storage.py +++ b/astacus/coordinator/plugins/clickhouse/async_object_storage.py @@ -3,11 +3,12 @@ See LICENSE for details """ from abc import ABC, abstractmethod +from astacus.common.rohmustorage import RohmuStorageConfig from pathlib import Path from rohmu import BaseTransfer from rohmu.errors import FileNotFoundFromStorageError from starlette.concurrency import run_in_threadpool -from typing import Any, Iterator, Mapping, Self, Sequence +from typing import Any, Iterator, Mapping, Self, Sequence, Union import contextlib import dataclasses @@ -27,7 +28,7 @@ class ObjectStorageItem: class AsyncObjectStorage(ABC): @abstractmethod - def get_config(self) -> Mapping[str, Any]: + def get_config(self) -> Union[RohmuStorageConfig, dict]: ... @abstractmethod @@ -44,12 +45,12 @@ async def copy_items_from(self, source: "AsyncObjectStorage", keys: Sequence[Pat class ThreadSafeRohmuStorage: - def __init__(self, config: Mapping[str, Any]) -> None: + def __init__(self, config: RohmuStorageConfig) -> None: self.config = config - self._storage = rohmu.get_transfer(config) + self._storage = rohmu.get_transfer_from_model(config) self._storage_lock = threading.Lock() - def list_iter(self, key: str, *, with_metadata: bool = True, deep: bool = False) -> list[Mapping[str, Any]]: + def list_iter(self, key: str, *, with_metadata: bool = True, deep: bool = False) -> Iterator[Mapping[str, Any]]: with self._storage_lock: return self._storage.list_iter(key, with_metadata=with_metadata, deep=deep) @@ -67,7 +68,7 @@ def copy_items_from(self, source: "ThreadSafeRohmuStorage", keys: Sequence[str]) target_storage.copy_files_from(source=source_storage, keys=keys) @contextlib.contextmanager - def get_storage(self) -> Iterator[BaseTransfer]: + def get_storage(self) -> Iterator[BaseTransfer[Any]]: with self._storage_lock: yield self._storage @@ -76,7 +77,7 @@ def get_storage(self) -> Iterator[BaseTransfer]: class RohmuAsyncObjectStorage(AsyncObjectStorage): storage: ThreadSafeRohmuStorage - def get_config(self) -> Mapping[str, Any]: + def get_config(self) -> RohmuStorageConfig: return self.storage.config async def list_items(self) -> list[ObjectStorageItem]: @@ -101,7 +102,7 @@ class MemoryAsyncObjectStorage(AsyncObjectStorage): def from_items(cls, items: Sequence[ObjectStorageItem]) -> Self: return cls(items={item.key: item for item in items}) - def get_config(self) -> Mapping[str, Any]: + def get_config(self) -> dict: # Exposing the object id in the config ensures that the same memory storage # has the same config as itself and a different config as another memory storage. # Using a manually picked name would be more error-prone: we want two object diff --git a/astacus/coordinator/plugins/clickhouse/disks.py b/astacus/coordinator/plugins/clickhouse/disks.py index 83d2bece..a4333d10 100644 --- a/astacus/coordinator/plugins/clickhouse/disks.py +++ b/astacus/coordinator/plugins/clickhouse/disks.py @@ -33,7 +33,7 @@ def from_disk_config(cls, config: DiskConfiguration, storage_name: str | None = object_storage: RohmuAsyncObjectStorage | None = None else: config_name = storage_name if storage_name is not None else config.object_storage.default_storage - storage_config = config.object_storage.storages[config_name].dict(by_alias=True, exclude_unset=True) + storage_config = config.object_storage.storages[config_name] object_storage = RohmuAsyncObjectStorage(storage=ThreadSafeRohmuStorage(config=storage_config)) return Disk( type=config.type, diff --git a/setup.cfg b/setup.cfg index 1abddefe..73b8d848 100644 --- a/setup.cfg +++ b/setup.cfg @@ -13,7 +13,7 @@ install_requires = fastapi==0.86.0 httpx==0.22.0 protobuf==3.19.4 - rohmu==1.1.3 + rohmu==1.2.0 sentry-sdk==1.6.0 tabulate==0.9.0 typing-extensions==4.7.1 diff --git a/tests/integration/coordinator/plugins/clickhouse/conftest.py b/tests/integration/coordinator/plugins/clickhouse/conftest.py index d97e3e65..85c15c3c 100644 --- a/tests/integration/coordinator/plugins/clickhouse/conftest.py +++ b/tests/integration/coordinator/plugins/clickhouse/conftest.py @@ -5,15 +5,7 @@ from _pytest.fixtures import FixtureRequest from astacus.client import create_client_parsers from astacus.common.ipc import Plugin -from astacus.common.rohmustorage import ( - RohmuCompression, - RohmuCompressionType, - RohmuConfig, - RohmuEncryptionKey, - RohmuLocalStorageConfig, - RohmuS3StorageConfig, - RohmuStorageType, -) +from astacus.common.rohmustorage import RohmuCompression, RohmuCompressionType, RohmuConfig, RohmuEncryptionKey from astacus.common.utils import build_netloc from astacus.config import GlobalConfig, UvicornConfig from astacus.coordinator.config import CoordinatorConfig, CoordinatorNode @@ -44,6 +36,7 @@ import dataclasses import logging import pytest +import rohmu import secrets import sys import tempfile @@ -446,7 +439,7 @@ async def _astacus(*, config: GlobalConfig) -> AsyncIterator[Service]: async with background_process(*cmd, env={"PYTHONPATH": astacus_source_root}) as process: await wait_url_up(f"http://localhost:{config.uvicorn.port}") storage = config.object_storage.storages[config.object_storage.default_storage] - assert isinstance(storage, RohmuLocalStorageConfig) + assert isinstance(storage, rohmu.LocalObjectStorageConfig) data_dir = storage.directory yield Service(process=process, port=config.uvicorn.port, data_dir=data_dir) @@ -476,8 +469,8 @@ def create_astacus_configs( astacus_backup_storage_path.mkdir(exist_ok=True) node_ports = [ports.allocate() for _ in clickhouse_cluster.services] disk_storages = { - "default": RohmuS3StorageConfig( - storage_type=RohmuStorageType.s3, + "default": rohmu.S3ObjectStorageConfig( + storage_type=rohmu.StorageDriver.s3, region="fake", host=minio_bucket.host, port=minio_bucket.port, @@ -488,18 +481,18 @@ def create_astacus_configs( ) } backup_storages = { - "default": RohmuLocalStorageConfig( - storage_type=RohmuStorageType.local, - directory=str(astacus_backup_storage_path), + "default": rohmu.LocalObjectStorageConfig( + storage_type=rohmu.StorageDriver.local, + directory=astacus_backup_storage_path, ) } if restorable_source: - backup_storages["restorable"] = RohmuLocalStorageConfig( - storage_type=RohmuStorageType.local, - directory=str(restorable_source.astacus_storage_path), + backup_storages["restorable"] = rohmu.LocalObjectStorageConfig( + storage_type=rohmu.StorageDriver.local, + directory=restorable_source.astacus_storage_path, ) - disk_storages["restorable"] = RohmuS3StorageConfig( - storage_type=RohmuStorageType.s3, + disk_storages["restorable"] = rohmu.S3ObjectStorageConfig( + storage_type=rohmu.StorageDriver.s3, region="fake", host=minio_bucket.host, port=minio_bucket.port, diff --git a/tests/unit/common/test_storage.py b/tests/unit/common/test_storage.py index 63f8228f..4720d907 100644 --- a/tests/unit/common/test_storage.py +++ b/tests/unit/common/test_storage.py @@ -43,7 +43,7 @@ def create_storage(*, tmpdir, engine, **kw): raise NotImplementedError(f"unknown storage {engine}") -def _test_hexdigeststorage(storage): +def _test_hexdigeststorage(storage: FileStorage): storage.upload_hexdigest_bytes(TEST_HEXDIGEST, TEXT_HEXDIGEST_DATA) assert storage.download_hexdigest_bytes(TEST_HEXDIGEST) == TEXT_HEXDIGEST_DATA # Ensure that download attempts of nonexistent keys give NotFoundException @@ -56,7 +56,7 @@ def _test_hexdigeststorage(storage): assert storage.list_hexdigests() == [] -def _test_jsonstorage(storage): +def _test_jsonstorage(storage: JsonStorage): assert storage.list_jsons() == [] storage.upload_json(TEST_JSON, TEST_JSON_DATA) assert storage.download_json(TEST_JSON) == TEST_JSON_DATA diff --git a/tests/unit/coordinator/plugins/clickhouse/test_dependencies.py b/tests/unit/coordinator/plugins/clickhouse/test_dependencies.py index a323b0b0..a3891c66 100644 --- a/tests/unit/coordinator/plugins/clickhouse/test_dependencies.py +++ b/tests/unit/coordinator/plugins/clickhouse/test_dependencies.py @@ -16,48 +16,48 @@ def test_tables_sorted_by_dependencies() -> None: t1 = Table( - database="db_one", - name="t1", + database=b"db_one", + name=b"t1", engine="DontCare", uuid=uuid.UUID(int=0), - create_query="", - dependencies=[("db_two", "t4")], + create_query=b"", + dependencies=[(b"db_two", b"t4")], ) t2 = Table( - database="db_one", - name="t2", + database=b"db_one", + name=b"t2", engine="DontCare", uuid=uuid.UUID(int=0), - create_query="", + create_query=b"", dependencies=[], ) t3 = Table( - database="db_one", - name="t3", + database=b"db_one", + name=b"t3", engine="DontCare", uuid=uuid.UUID(int=0), - create_query="", - dependencies=[("db_one", "t2")], + create_query=b"", + dependencies=[(b"db_one", b"t2")], ) t4 = Table( - database="db_two", - name="t4", + database=b"db_two", + name=b"t4", engine="DontCare", uuid=uuid.UUID(int=0), - create_query="", - dependencies=[("db_one", "t2"), ("db_one", "t3")], + create_query=b"", + dependencies=[(b"db_one", b"t2"), (b"db_one", b"t3")], ) assert tables_sorted_by_dependencies([t1, t2, t3, t4]) == [t1, t4, t3, t2] def test_dangling_table_dependency_doesnt_crash() -> None: t1 = Table( - database="db_one", - name="t1", + database=b"db_one", + name=b"t1", engine="DontCare", uuid=uuid.UUID(int=0), - create_query="", - dependencies=[("db_two", "t4")], + create_query=b"", + dependencies=[(b"db_two", b"t4")], ) assert tables_sorted_by_dependencies([t1]) == [t1] diff --git a/tests/unit/coordinator/plugins/clickhouse/test_disks.py b/tests/unit/coordinator/plugins/clickhouse/test_disks.py index 58c909b2..56b8b301 100644 --- a/tests/unit/coordinator/plugins/clickhouse/test_disks.py +++ b/tests/unit/coordinator/plugins/clickhouse/test_disks.py @@ -2,7 +2,6 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from astacus.common.rohmustorage import RohmuLocalStorageConfig, RohmuStorageType from astacus.common.snapshot import SnapshotGroup from astacus.coordinator.plugins.clickhouse.config import DiskConfiguration, DiskObjectStorageConfiguration, DiskType from astacus.coordinator.plugins.clickhouse.disks import Disk, Disks, ParsedPath, PartFilePathError @@ -10,6 +9,7 @@ from uuid import UUID import pytest +import rohmu SAMPLE_DEFAULT_DISK_CONFIGURATION = DiskConfiguration(type=DiskType.local, path=Path(), name="default") SAMPLE_SECONDARY_DISK_CONFIGURATION = DiskConfiguration( @@ -21,13 +21,13 @@ # We're using local storage here because it's the only storage type # that does not try to do operation during its __init__... storages={ - "default": RohmuLocalStorageConfig( - storage_type=RohmuStorageType.local, - directory="default-bucket", + "default": rohmu.LocalObjectStorageConfig( + storage_type=rohmu.StorageDriver.local, + directory=Path("default-bucket"), ), - "recovery": RohmuLocalStorageConfig( - storage_type=RohmuStorageType.local, - directory="recovery-bucket", + "recovery": rohmu.LocalObjectStorageConfig( + storage_type=rohmu.StorageDriver.local, + directory=Path("recovery-bucket"), ), }, ), @@ -175,19 +175,33 @@ def test_other_disk_parsed_path_to_path() -> None: def test_disk_can_load_default_object_storage_config() -> None: disk = Disk.from_disk_config(SAMPLE_SECONDARY_DISK_CONFIGURATION) - assert disk.object_storage.get_config()["directory"] == "default-bucket" + assert disk.object_storage is not None + config = disk.object_storage.get_config() + assert isinstance(config, rohmu.LocalObjectStorageConfig) + assert config.directory == Path("default-bucket") def test_disk_can_load_alternate_object_storage_config() -> None: disk = Disk.from_disk_config(SAMPLE_SECONDARY_DISK_CONFIGURATION, storage_name="recovery") - assert disk.object_storage.get_config()["directory"] == "recovery-bucket" + assert disk.object_storage is not None + config = disk.object_storage.get_config() + assert isinstance(config, rohmu.LocalObjectStorageConfig) + assert config.directory == Path("recovery-bucket") def test_disks_can_load_default_object_storage_config() -> None: disks = Disks.from_disk_configs(SAMPLE_DISKS_CONFIGURATION) - assert disks.get_object_storage(disk_name="secondary").get_config()["directory"] == "default-bucket" + storage = disks.get_object_storage(disk_name="secondary") + assert storage is not None + config = storage.get_config() + assert isinstance(config, rohmu.LocalObjectStorageConfig) + assert config.directory == Path("default-bucket") def test_disks_can_load_alternate_object_storage_config() -> None: disks = Disks.from_disk_configs(SAMPLE_DISKS_CONFIGURATION, storage_name="recovery") - assert disks.get_object_storage(disk_name="secondary").get_config()["directory"] == "recovery-bucket" + storage = disks.get_object_storage(disk_name="secondary") + assert storage is not None + config = storage.get_config() + assert isinstance(config, rohmu.LocalObjectStorageConfig) + assert config.directory == Path("recovery-bucket") diff --git a/tests/utils.py b/tests/utils.py index 9b578b4d..859375a1 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -39,7 +39,7 @@ -----END PRIVATE KEY-----""" -def create_rohmu_config(tmpdir, *, compression=True, encryption=True): +def create_rohmu_config(tmpdir, *, compression=True, encryption=True) -> RohmuConfig: x_path = Path(tmpdir) / "rohmu-x" x_path.mkdir(exist_ok=True) y_path = Path(tmpdir) / "rohmu-y"