Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Rohmu models directly #148

Merged
merged 1 commit into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,5 @@ astacus/proto/*_pb2.py
**/py.typed

.idea

.vscode/
99 changes: 7 additions & 92 deletions astacus/common/rohmustorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
Rohmu-specific actual object storage implementation

"""
from .magic import StrEnum
from .storage import Json, MultiStorage, Storage, StorageUploadResult
from .utils import AstacusModel
from astacus.common import exceptions
Expand All @@ -16,7 +15,6 @@
from rohmu.compressor import CompressionStream
from rohmu.encryptor import EncryptorStream
from typing import BinaryIO, Dict, Optional, Union
from typing_extensions import Literal

import io
import json
Expand All @@ -28,102 +26,19 @@
logger = logging.getLogger(__name__)


class RohmuStorageType(StrEnum):
"""Embodies what is detected in rohmu.get_class_for_transfer"""

azure = "azure"
google = "google"
local = "local"
s3 = "s3"
# sftp = "sftp"
# swift = "swift"


class RohmuModel(AstacusModel):
class Config:
# As we're keen to both export and decode json, just using enum
# values is much saner than the alternatives
use_enum_values = True


class RohmuProxyType(StrEnum):
socks5 = "socks5"
http = "http"


class RohmuProxyInfo(RohmuModel):
type: RohmuProxyType
host: str
port: int
user: Optional[str] = None
password: Optional[str] = Field(None, alias="pass")


class StatsdInfo(RohmuModel):
host: str
port: int
tags: dict[str, str | int | None]


class ObjectStorageNotifier(RohmuModel):
notifier_type: str
url: str


class ObjectStorageConfig(RohmuModel):
storage_type: RohmuStorageType
prefix: Optional[str] = None
notifier: Optional[ObjectStorageNotifier] = None
statsd_info: Optional[StatsdInfo] = None


class RohmuProxyStorage(ObjectStorageConfig):
"""Storage backend with support for optional socks5 or http proxy connections"""

proxy_info: Optional[RohmuProxyInfo] = None


class RohmuLocalStorageConfig(ObjectStorageConfig):
storage_type: Literal[RohmuStorageType.local]
directory: str
prefix: Optional[str] = None


class RohmuS3StorageConfig(RohmuProxyStorage):
storage_type: Literal[RohmuStorageType.s3]
region: str
bucket_name: str
aws_access_key_id: Optional[str] = None
aws_secret_access_key: Optional[str] = None
host: Optional[str] = None
port: Optional[int] = None
is_secure: Optional[bool] = False
is_verify_tls: Optional[bool] = False
prefix: Optional[str] = None
# Some more obscure options with defaults are omitted


class RohmuAzureStorageConfig(RohmuProxyStorage):
storage_type: Literal[RohmuStorageType.azure]
account_name: str
bucket_name: str
account_key: Optional[str] = None
sas_token: Optional[str] = None
prefix: Optional[str] = None
azure_cloud: Optional[str] = None


class RohmuGoogleStorageConfig(RohmuProxyStorage):
storage_type: Literal[RohmuStorageType.google]
# rohmu.object_storage.google:GoogleTransfer.__init__ arguments
project_id: str
bucket_name: str
credentials: Dict[str, str]
prefix: Optional[str] = None
# credential_file # omitted, n/a


RohmuStorageConfig = Union[RohmuLocalStorageConfig, RohmuS3StorageConfig, RohmuAzureStorageConfig, RohmuGoogleStorageConfig]
RohmuStorageConfig = Union[
rohmu.LocalObjectStorageConfig,
rohmu.S3ObjectStorageConfig,
rohmu.AzureObjectStorageConfig,
rohmu.GoogleObjectStorageConfig,
]


class RohmuEncryptionKey(RohmuModel):
Expand Down Expand Up @@ -240,7 +155,7 @@ def _choose_storage(self, storage: str | None = None) -> None:
storage = self.config.default_storage
self.storage_name = storage
self.storage_config = self.config.storages[storage]
self.storage = rohmu.get_transfer(self.storage_config.dict(by_alias=True, exclude_unset=True))
self.storage = rohmu.get_transfer_from_model(self.storage_config)

def copy(self) -> "RohmuStorage":
return RohmuStorage(config=self.config, storage=self.storage_name)
Expand Down
17 changes: 9 additions & 8 deletions astacus/coordinator/plugins/clickhouse/async_object_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
See LICENSE for details
"""
from abc import ABC, abstractmethod
from astacus.common.rohmustorage import RohmuStorageConfig
from pathlib import Path
from rohmu import BaseTransfer
from rohmu.errors import FileNotFoundFromStorageError
from starlette.concurrency import run_in_threadpool
from typing import Any, Iterator, Mapping, Self, Sequence
from typing import Any, Iterator, Mapping, Self, Sequence, Union

import contextlib
import dataclasses
Expand All @@ -27,7 +28,7 @@ class ObjectStorageItem:

class AsyncObjectStorage(ABC):
@abstractmethod
def get_config(self) -> Mapping[str, Any]:
def get_config(self) -> Union[RohmuStorageConfig, dict]:
...

@abstractmethod
Expand All @@ -44,12 +45,12 @@ async def copy_items_from(self, source: "AsyncObjectStorage", keys: Sequence[Pat


class ThreadSafeRohmuStorage:
def __init__(self, config: Mapping[str, Any]) -> None:
def __init__(self, config: RohmuStorageConfig) -> None:
self.config = config
self._storage = rohmu.get_transfer(config)
self._storage = rohmu.get_transfer_from_model(config)
self._storage_lock = threading.Lock()

def list_iter(self, key: str, *, with_metadata: bool = True, deep: bool = False) -> list[Mapping[str, Any]]:
def list_iter(self, key: str, *, with_metadata: bool = True, deep: bool = False) -> Iterator[Mapping[str, Any]]:
with self._storage_lock:
return self._storage.list_iter(key, with_metadata=with_metadata, deep=deep)

Expand All @@ -67,7 +68,7 @@ def copy_items_from(self, source: "ThreadSafeRohmuStorage", keys: Sequence[str])
target_storage.copy_files_from(source=source_storage, keys=keys)

@contextlib.contextmanager
def get_storage(self) -> Iterator[BaseTransfer]:
def get_storage(self) -> Iterator[BaseTransfer[Any]]:
with self._storage_lock:
yield self._storage

Expand All @@ -76,7 +77,7 @@ def get_storage(self) -> Iterator[BaseTransfer]:
class RohmuAsyncObjectStorage(AsyncObjectStorage):
storage: ThreadSafeRohmuStorage

def get_config(self) -> Mapping[str, Any]:
def get_config(self) -> RohmuStorageConfig:
return self.storage.config

async def list_items(self) -> list[ObjectStorageItem]:
Expand All @@ -101,7 +102,7 @@ class MemoryAsyncObjectStorage(AsyncObjectStorage):
def from_items(cls, items: Sequence[ObjectStorageItem]) -> Self:
return cls(items={item.key: item for item in items})

def get_config(self) -> Mapping[str, Any]:
def get_config(self) -> dict:
# Exposing the object id in the config ensures that the same memory storage
# has the same config as itself and a different config as another memory storage.
# Using a manually picked name would be more error-prone: we want two object
Expand Down
2 changes: 1 addition & 1 deletion astacus/coordinator/plugins/clickhouse/disks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def from_disk_config(cls, config: DiskConfiguration, storage_name: str | None =
object_storage: RohmuAsyncObjectStorage | None = None
else:
config_name = storage_name if storage_name is not None else config.object_storage.default_storage
storage_config = config.object_storage.storages[config_name].dict(by_alias=True, exclude_unset=True)
storage_config = config.object_storage.storages[config_name]
object_storage = RohmuAsyncObjectStorage(storage=ThreadSafeRohmuStorage(config=storage_config))
return Disk(
type=config.type,
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ install_requires =
fastapi==0.86.0
httpx==0.22.0
protobuf==3.19.4
rohmu==1.1.3
rohmu==1.2.0
sentry-sdk==1.6.0
tabulate==0.9.0
typing-extensions==4.7.1
Expand Down
33 changes: 13 additions & 20 deletions tests/integration/coordinator/plugins/clickhouse/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,7 @@
from _pytest.fixtures import FixtureRequest
from astacus.client import create_client_parsers
from astacus.common.ipc import Plugin
from astacus.common.rohmustorage import (
RohmuCompression,
RohmuCompressionType,
RohmuConfig,
RohmuEncryptionKey,
RohmuLocalStorageConfig,
RohmuS3StorageConfig,
RohmuStorageType,
)
from astacus.common.rohmustorage import RohmuCompression, RohmuCompressionType, RohmuConfig, RohmuEncryptionKey
from astacus.common.utils import build_netloc
from astacus.config import GlobalConfig, UvicornConfig
from astacus.coordinator.config import CoordinatorConfig, CoordinatorNode
Expand Down Expand Up @@ -44,6 +36,7 @@
import dataclasses
import logging
import pytest
import rohmu
import secrets
import sys
import tempfile
Expand Down Expand Up @@ -446,7 +439,7 @@ async def _astacus(*, config: GlobalConfig) -> AsyncIterator[Service]:
async with background_process(*cmd, env={"PYTHONPATH": astacus_source_root}) as process:
await wait_url_up(f"http://localhost:{config.uvicorn.port}")
storage = config.object_storage.storages[config.object_storage.default_storage]
assert isinstance(storage, RohmuLocalStorageConfig)
assert isinstance(storage, rohmu.LocalObjectStorageConfig)
data_dir = storage.directory
yield Service(process=process, port=config.uvicorn.port, data_dir=data_dir)

Expand Down Expand Up @@ -476,8 +469,8 @@ def create_astacus_configs(
astacus_backup_storage_path.mkdir(exist_ok=True)
node_ports = [ports.allocate() for _ in clickhouse_cluster.services]
disk_storages = {
"default": RohmuS3StorageConfig(
storage_type=RohmuStorageType.s3,
"default": rohmu.S3ObjectStorageConfig(
storage_type=rohmu.StorageDriver.s3,
region="fake",
host=minio_bucket.host,
port=minio_bucket.port,
Expand All @@ -488,18 +481,18 @@ def create_astacus_configs(
)
}
backup_storages = {
"default": RohmuLocalStorageConfig(
storage_type=RohmuStorageType.local,
directory=str(astacus_backup_storage_path),
"default": rohmu.LocalObjectStorageConfig(
storage_type=rohmu.StorageDriver.local,
directory=astacus_backup_storage_path,
)
}
if restorable_source:
backup_storages["restorable"] = RohmuLocalStorageConfig(
storage_type=RohmuStorageType.local,
directory=str(restorable_source.astacus_storage_path),
backup_storages["restorable"] = rohmu.LocalObjectStorageConfig(
storage_type=rohmu.StorageDriver.local,
directory=restorable_source.astacus_storage_path,
)
disk_storages["restorable"] = RohmuS3StorageConfig(
storage_type=RohmuStorageType.s3,
disk_storages["restorable"] = rohmu.S3ObjectStorageConfig(
storage_type=rohmu.StorageDriver.s3,
region="fake",
host=minio_bucket.host,
port=minio_bucket.port,
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/common/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def create_storage(*, tmpdir, engine, **kw):
raise NotImplementedError(f"unknown storage {engine}")


def _test_hexdigeststorage(storage):
def _test_hexdigeststorage(storage: FileStorage):
storage.upload_hexdigest_bytes(TEST_HEXDIGEST, TEXT_HEXDIGEST_DATA)
assert storage.download_hexdigest_bytes(TEST_HEXDIGEST) == TEXT_HEXDIGEST_DATA
# Ensure that download attempts of nonexistent keys give NotFoundException
Expand All @@ -56,7 +56,7 @@ def _test_hexdigeststorage(storage):
assert storage.list_hexdigests() == []


def _test_jsonstorage(storage):
def _test_jsonstorage(storage: JsonStorage):
assert storage.list_jsons() == []
storage.upload_json(TEST_JSON, TEST_JSON_DATA)
assert storage.download_json(TEST_JSON) == TEST_JSON_DATA
Expand Down
38 changes: 19 additions & 19 deletions tests/unit/coordinator/plugins/clickhouse/test_dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,48 +16,48 @@

def test_tables_sorted_by_dependencies() -> None:
t1 = Table(
database="db_one",
name="t1",
database=b"db_one",
name=b"t1",
engine="DontCare",
uuid=uuid.UUID(int=0),
create_query="",
dependencies=[("db_two", "t4")],
create_query=b"",
dependencies=[(b"db_two", b"t4")],
)
t2 = Table(
database="db_one",
name="t2",
database=b"db_one",
name=b"t2",
engine="DontCare",
uuid=uuid.UUID(int=0),
create_query="",
create_query=b"",
dependencies=[],
)
t3 = Table(
database="db_one",
name="t3",
database=b"db_one",
name=b"t3",
engine="DontCare",
uuid=uuid.UUID(int=0),
create_query="",
dependencies=[("db_one", "t2")],
create_query=b"",
dependencies=[(b"db_one", b"t2")],
)
t4 = Table(
database="db_two",
name="t4",
database=b"db_two",
name=b"t4",
engine="DontCare",
uuid=uuid.UUID(int=0),
create_query="",
dependencies=[("db_one", "t2"), ("db_one", "t3")],
create_query=b"",
dependencies=[(b"db_one", b"t2"), (b"db_one", b"t3")],
)
assert tables_sorted_by_dependencies([t1, t2, t3, t4]) == [t1, t4, t3, t2]


def test_dangling_table_dependency_doesnt_crash() -> None:
t1 = Table(
database="db_one",
name="t1",
database=b"db_one",
name=b"t1",
engine="DontCare",
uuid=uuid.UUID(int=0),
create_query="",
dependencies=[("db_two", "t4")],
create_query=b"",
dependencies=[(b"db_two", b"t4")],
)
assert tables_sorted_by_dependencies([t1]) == [t1]

Expand Down
Loading
Loading