Skip to content

Commit

Permalink
Use Rohmu models directly
Browse files Browse the repository at this point in the history
  • Loading branch information
joelynch committed Oct 9, 2023
1 parent 20e809d commit f619e7d
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 155 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,5 @@ astacus/proto/*_pb2.py
**/py.typed

.idea

.vscode/
99 changes: 7 additions & 92 deletions astacus/common/rohmustorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
Rohmu-specific actual object storage implementation
"""
from .magic import StrEnum
from .storage import Json, MultiStorage, Storage, StorageUploadResult
from .utils import AstacusModel
from astacus.common import exceptions
Expand All @@ -16,7 +15,6 @@
from rohmu.compressor import CompressionStream
from rohmu.encryptor import EncryptorStream
from typing import BinaryIO, Dict, Optional, Union
from typing_extensions import Literal

import io
import json
Expand All @@ -28,102 +26,19 @@
logger = logging.getLogger(__name__)


class RohmuStorageType(StrEnum):
"""Embodies what is detected in rohmu.get_class_for_transfer"""

azure = "azure"
google = "google"
local = "local"
s3 = "s3"
# sftp = "sftp"
# swift = "swift"


class RohmuModel(AstacusModel):
class Config:
# As we're keen to both export and decode json, just using enum
# values is much saner than the alternatives
use_enum_values = True


class RohmuProxyType(StrEnum):
socks5 = "socks5"
http = "http"


class RohmuProxyInfo(RohmuModel):
type: RohmuProxyType
host: str
port: int
user: Optional[str] = None
password: Optional[str] = Field(None, alias="pass")


class StatsdInfo(RohmuModel):
host: str
port: int
tags: dict[str, str | int | None]


class ObjectStorageNotifier(RohmuModel):
notifier_type: str
url: str


class ObjectStorageConfig(RohmuModel):
storage_type: RohmuStorageType
prefix: Optional[str] = None
notifier: Optional[ObjectStorageNotifier] = None
statsd_info: Optional[StatsdInfo] = None


class RohmuProxyStorage(ObjectStorageConfig):
"""Storage backend with support for optional socks5 or http proxy connections"""

proxy_info: Optional[RohmuProxyInfo] = None


class RohmuLocalStorageConfig(ObjectStorageConfig):
storage_type: Literal[RohmuStorageType.local]
directory: str
prefix: Optional[str] = None


class RohmuS3StorageConfig(RohmuProxyStorage):
storage_type: Literal[RohmuStorageType.s3]
region: str
bucket_name: str
aws_access_key_id: Optional[str] = None
aws_secret_access_key: Optional[str] = None
host: Optional[str] = None
port: Optional[int] = None
is_secure: Optional[bool] = False
is_verify_tls: Optional[bool] = False
prefix: Optional[str] = None
# Some more obscure options with defaults are omitted


class RohmuAzureStorageConfig(RohmuProxyStorage):
storage_type: Literal[RohmuStorageType.azure]
account_name: str
bucket_name: str
account_key: Optional[str] = None
sas_token: Optional[str] = None
prefix: Optional[str] = None
azure_cloud: Optional[str] = None


class RohmuGoogleStorageConfig(RohmuProxyStorage):
storage_type: Literal[RohmuStorageType.google]
# rohmu.object_storage.google:GoogleTransfer.__init__ arguments
project_id: str
bucket_name: str
credentials: Dict[str, str]
prefix: Optional[str] = None
# credential_file # omitted, n/a


RohmuStorageConfig = Union[RohmuLocalStorageConfig, RohmuS3StorageConfig, RohmuAzureStorageConfig, RohmuGoogleStorageConfig]
RohmuStorageConfig = Union[
rohmu.LocalObjectStorageConfig,
rohmu.S3ObjectStorageConfig,
rohmu.AzureObjectStorageConfig,
rohmu.GoogleObjectStorageConfig,
]


class RohmuEncryptionKey(RohmuModel):
Expand Down Expand Up @@ -240,7 +155,7 @@ def _choose_storage(self, storage: str | None = None) -> None:
storage = self.config.default_storage
self.storage_name = storage
self.storage_config = self.config.storages[storage]
self.storage = rohmu.get_transfer(self.storage_config.dict(by_alias=True, exclude_unset=True))
self.storage = rohmu.get_transfer_from_model(self.storage_config)

def copy(self) -> "RohmuStorage":
return RohmuStorage(config=self.config, storage=self.storage_name)
Expand Down
17 changes: 9 additions & 8 deletions astacus/coordinator/plugins/clickhouse/async_object_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
See LICENSE for details
"""
from abc import ABC, abstractmethod
from astacus.common.rohmustorage import RohmuStorageConfig
from pathlib import Path
from rohmu import BaseTransfer
from rohmu.errors import FileNotFoundFromStorageError
from starlette.concurrency import run_in_threadpool
from typing import Any, Iterator, Mapping, Self, Sequence
from typing import Any, Iterator, Mapping, Self, Sequence, Union

import contextlib
import dataclasses
Expand All @@ -27,7 +28,7 @@ class ObjectStorageItem:

class AsyncObjectStorage(ABC):
@abstractmethod
def get_config(self) -> Mapping[str, Any]:
def get_config(self) -> Union[RohmuStorageConfig, dict]:
...

@abstractmethod
Expand All @@ -44,12 +45,12 @@ async def copy_items_from(self, source: "AsyncObjectStorage", keys: Sequence[Pat


class ThreadSafeRohmuStorage:
def __init__(self, config: Mapping[str, Any]) -> None:
def __init__(self, config: RohmuStorageConfig) -> None:
self.config = config
self._storage = rohmu.get_transfer(config)
self._storage = rohmu.get_transfer_from_model(config)
self._storage_lock = threading.Lock()

def list_iter(self, key: str, *, with_metadata: bool = True, deep: bool = False) -> list[Mapping[str, Any]]:
def list_iter(self, key: str, *, with_metadata: bool = True, deep: bool = False) -> Iterator[Mapping[str, Any]]:
with self._storage_lock:
return self._storage.list_iter(key, with_metadata=with_metadata, deep=deep)

Expand All @@ -67,7 +68,7 @@ def copy_items_from(self, source: "ThreadSafeRohmuStorage", keys: Sequence[str])
target_storage.copy_files_from(source=source_storage, keys=keys)

@contextlib.contextmanager
def get_storage(self) -> Iterator[BaseTransfer]:
def get_storage(self) -> Iterator[BaseTransfer[Any]]:
with self._storage_lock:
yield self._storage

Expand All @@ -76,7 +77,7 @@ def get_storage(self) -> Iterator[BaseTransfer]:
class RohmuAsyncObjectStorage(AsyncObjectStorage):
storage: ThreadSafeRohmuStorage

def get_config(self) -> Mapping[str, Any]:
def get_config(self) -> RohmuStorageConfig:
return self.storage.config

async def list_items(self) -> list[ObjectStorageItem]:
Expand All @@ -101,7 +102,7 @@ class MemoryAsyncObjectStorage(AsyncObjectStorage):
def from_items(cls, items: Sequence[ObjectStorageItem]) -> Self:
return cls(items={item.key: item for item in items})

def get_config(self) -> Mapping[str, Any]:
def get_config(self) -> dict:
# Exposing the object id in the config ensures that the same memory storage
# has the same config as itself and a different config as another memory storage.
# Using a manually picked name would be more error-prone: we want two object
Expand Down
2 changes: 1 addition & 1 deletion astacus/coordinator/plugins/clickhouse/disks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def from_disk_config(cls, config: DiskConfiguration, storage_name: str | None =
object_storage: RohmuAsyncObjectStorage | None = None
else:
config_name = storage_name if storage_name is not None else config.object_storage.default_storage
storage_config = config.object_storage.storages[config_name].dict(by_alias=True, exclude_unset=True)
storage_config = config.object_storage.storages[config_name]
object_storage = RohmuAsyncObjectStorage(storage=ThreadSafeRohmuStorage(config=storage_config))
return Disk(
type=config.type,
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ install_requires =
fastapi==0.86.0
httpx==0.22.0
protobuf==3.19.4
rohmu==1.1.3
rohmu==1.2.0
sentry-sdk==1.6.0
tabulate==0.9.0
typing-extensions==4.7.1
Expand Down
33 changes: 13 additions & 20 deletions tests/integration/coordinator/plugins/clickhouse/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,7 @@
from _pytest.fixtures import FixtureRequest
from astacus.client import create_client_parsers
from astacus.common.ipc import Plugin
from astacus.common.rohmustorage import (
RohmuCompression,
RohmuCompressionType,
RohmuConfig,
RohmuEncryptionKey,
RohmuLocalStorageConfig,
RohmuS3StorageConfig,
RohmuStorageType,
)
from astacus.common.rohmustorage import RohmuCompression, RohmuCompressionType, RohmuConfig, RohmuEncryptionKey
from astacus.common.utils import build_netloc
from astacus.config import GlobalConfig, UvicornConfig
from astacus.coordinator.config import CoordinatorConfig, CoordinatorNode
Expand Down Expand Up @@ -44,6 +36,7 @@
import dataclasses
import logging
import pytest
import rohmu
import secrets
import sys
import tempfile
Expand Down Expand Up @@ -446,7 +439,7 @@ async def _astacus(*, config: GlobalConfig) -> AsyncIterator[Service]:
async with background_process(*cmd, env={"PYTHONPATH": astacus_source_root}) as process:
await wait_url_up(f"http://localhost:{config.uvicorn.port}")
storage = config.object_storage.storages[config.object_storage.default_storage]
assert isinstance(storage, RohmuLocalStorageConfig)
assert isinstance(storage, rohmu.LocalObjectStorageConfig)
data_dir = storage.directory
yield Service(process=process, port=config.uvicorn.port, data_dir=data_dir)

Expand Down Expand Up @@ -476,8 +469,8 @@ def create_astacus_configs(
astacus_backup_storage_path.mkdir(exist_ok=True)
node_ports = [ports.allocate() for _ in clickhouse_cluster.services]
disk_storages = {
"default": RohmuS3StorageConfig(
storage_type=RohmuStorageType.s3,
"default": rohmu.S3ObjectStorageConfig(
storage_type=rohmu.StorageDriver.s3,
region="fake",
host=minio_bucket.host,
port=minio_bucket.port,
Expand All @@ -488,18 +481,18 @@ def create_astacus_configs(
)
}
backup_storages = {
"default": RohmuLocalStorageConfig(
storage_type=RohmuStorageType.local,
directory=str(astacus_backup_storage_path),
"default": rohmu.LocalObjectStorageConfig(
storage_type=rohmu.StorageDriver.local,
directory=astacus_backup_storage_path,
)
}
if restorable_source:
backup_storages["restorable"] = RohmuLocalStorageConfig(
storage_type=RohmuStorageType.local,
directory=str(restorable_source.astacus_storage_path),
backup_storages["restorable"] = rohmu.LocalObjectStorageConfig(
storage_type=rohmu.StorageDriver.local,
directory=restorable_source.astacus_storage_path,
)
disk_storages["restorable"] = RohmuS3StorageConfig(
storage_type=RohmuStorageType.s3,
disk_storages["restorable"] = rohmu.S3ObjectStorageConfig(
storage_type=rohmu.StorageDriver.s3,
region="fake",
host=minio_bucket.host,
port=minio_bucket.port,
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/common/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def create_storage(*, tmpdir, engine, **kw):
raise NotImplementedError(f"unknown storage {engine}")


def _test_hexdigeststorage(storage):
def _test_hexdigeststorage(storage: FileStorage):
storage.upload_hexdigest_bytes(TEST_HEXDIGEST, TEXT_HEXDIGEST_DATA)
assert storage.download_hexdigest_bytes(TEST_HEXDIGEST) == TEXT_HEXDIGEST_DATA
# Ensure that download attempts of nonexistent keys give NotFoundException
Expand All @@ -56,7 +56,7 @@ def _test_hexdigeststorage(storage):
assert storage.list_hexdigests() == []


def _test_jsonstorage(storage):
def _test_jsonstorage(storage: JsonStorage):
assert storage.list_jsons() == []
storage.upload_json(TEST_JSON, TEST_JSON_DATA)
assert storage.download_json(TEST_JSON) == TEST_JSON_DATA
Expand Down
38 changes: 19 additions & 19 deletions tests/unit/coordinator/plugins/clickhouse/test_dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,48 +16,48 @@

def test_tables_sorted_by_dependencies() -> None:
t1 = Table(
database="db_one",
name="t1",
database=b"db_one",
name=b"t1",
engine="DontCare",
uuid=uuid.UUID(int=0),
create_query="",
dependencies=[("db_two", "t4")],
create_query=b"",
dependencies=[(b"db_two", b"t4")],
)
t2 = Table(
database="db_one",
name="t2",
database=b"db_one",
name=b"t2",
engine="DontCare",
uuid=uuid.UUID(int=0),
create_query="",
create_query=b"",
dependencies=[],
)
t3 = Table(
database="db_one",
name="t3",
database=b"db_one",
name=b"t3",
engine="DontCare",
uuid=uuid.UUID(int=0),
create_query="",
dependencies=[("db_one", "t2")],
create_query=b"",
dependencies=[(b"db_one", b"t2")],
)
t4 = Table(
database="db_two",
name="t4",
database=b"db_two",
name=b"t4",
engine="DontCare",
uuid=uuid.UUID(int=0),
create_query="",
dependencies=[("db_one", "t2"), ("db_one", "t3")],
create_query=b"",
dependencies=[(b"db_one", b"t2"), (b"db_one", b"t3")],
)
assert tables_sorted_by_dependencies([t1, t2, t3, t4]) == [t1, t4, t3, t2]


def test_dangling_table_dependency_doesnt_crash() -> None:
t1 = Table(
database="db_one",
name="t1",
database=b"db_one",
name=b"t1",
engine="DontCare",
uuid=uuid.UUID(int=0),
create_query="",
dependencies=[("db_two", "t4")],
create_query=b"",
dependencies=[(b"db_two", b"t4")],
)
assert tables_sorted_by_dependencies([t1]) == [t1]

Expand Down
Loading

0 comments on commit f619e7d

Please sign in to comment.