Skip to content

Commit

Permalink
Remove FastAPI dependency
Browse files Browse the repository at this point in the history
FastAPI does not suit our needs as our routes do not return pydantic
models, they return MsgSpec Structs. Additionally, we do not make use of
schema generation.  Instead, we can thinly wrap Starlette ourselves.
  • Loading branch information
joelynch committed Oct 2, 2024
1 parent f99ce21 commit 0caf16f
Show file tree
Hide file tree
Showing 36 changed files with 519 additions and 449 deletions.
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[MESSAGES CONTROL]
disable=
duplicate-code, # the checker in 2.9.6 fails with parallelism
invalid-name, # fastapi conventions break this
invalid-name, # fastapi / starlette conventions break this
missing-docstring,
no-member, # broken with pydantic + inheritance
too-few-public-methods, # some pydantic models have 0 and it is fine
Expand Down
1 change: 0 additions & 1 deletion astacus.spec
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ BuildRequires: snappy-devel
BuildRequires: which

# These are used when actually running the package
Requires: python3-fastapi
Requires: python3-httpx
Requires: python3-protobuf
Requires: python3-pyyaml
Expand Down
2 changes: 1 addition & 1 deletion astacus/common/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
Dependency injection helper functions.
"""

from fastapi import Request
from starlette.datastructures import URL
from starlette.requests import Request


def get_request_url(request: Request) -> URL:
Expand Down
35 changes: 0 additions & 35 deletions astacus/common/msgspec_glue.py

This file was deleted.

6 changes: 3 additions & 3 deletions astacus/common/op.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
from .exceptions import ExpiredOperationException
from .statsd import StatsClient
from .utils import AstacusModel
from astacus.starlette import JSONHTTPException
from collections.abc import Callable
from dataclasses import dataclass, field
from enum import Enum
from fastapi import HTTPException
from starlette.background import BackgroundTasks
from starlette.datastructures import URL
from typing import Any, Optional
Expand Down Expand Up @@ -149,11 +149,11 @@ def _sync_wrapper():

return Op.StartResult(op_id=op.op_id, status_url=status_url)

def get_op_and_op_info(self, *, op_id, op_name=None):
def get_op_and_op_info(self, *, op_id: int, op_name: str | None = None):
op_info = self.state.op_info
if op_id != op_info.op_id or (op_name and op_name != op_info.op_name):
logger.info("request for nonexistent %s.%s != %r", op_name, op_id, op_info)
raise HTTPException(
raise JSONHTTPException(
404,
{
"code": magic.ErrorCode.operation_id_mismatch,
Expand Down
7 changes: 3 additions & 4 deletions astacus/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from collections import deque
from collections.abc import AsyncIterable, AsyncIterator, Callable, Hashable, Iterable, Iterator, Mapping
from contextlib import contextmanager
from multiprocessing.dummy import Pool # fastapi + fork = bad idea
from multiprocessing.dummy import Pool # starlette + fork = bad idea
from pathlib import Path
from pydantic import BaseModel
from typing import Any, ContextManager, Final, Generic, IO, Literal, overload, TextIO, TypeAlias, TypeVar
Expand Down Expand Up @@ -83,9 +83,8 @@ def http_request(url, *, caller, method="get", timeout=10, ignore_status_code: b
"""Wrapper for requests.request which handles timeouts as non-exceptions,
and returns only valid results that we actually care about.
This is here primarily so that some requests stuff
(e.g. fastapi.testclient) still works, but we can mock things to
our hearts content in test code by doing 'things' here.
This is here primarily so that some requests stuff still works, but we can
mock things to our hearts content in test code by doing 'things' here.
"""
# TBD: may need to redact url in future, if we actually wind up
# using passwords in urls here.
Expand Down
5 changes: 3 additions & 2 deletions astacus/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
from astacus.common.utils import AstacusModel
from astacus.coordinator.config import APP_KEY as COORDINATOR_CONFIG_KEY, CoordinatorConfig
from astacus.node.config import APP_KEY as NODE_CONFIG_KEY, NodeConfig
from fastapi import FastAPI, Request
from pathlib import Path
from starlette.applications import Starlette
from starlette.requests import Request

import hashlib
import io
Expand Down Expand Up @@ -64,7 +65,7 @@ def get_config_content_and_hash(config_path: str | Path) -> tuple[str, str]:
return config_content.decode(), config_hash


def set_global_config_from_path(app: FastAPI, path: str | Path) -> GlobalConfig:
def set_global_config_from_path(app: Starlette, path: str | Path) -> GlobalConfig:
config_content, config_hash = get_config_content_and_hash(path)
with io.StringIO(config_content) as config_file:
config = GlobalConfig.parse_obj(yaml.safe_load(config_file))
Expand Down
121 changes: 65 additions & 56 deletions astacus/coordinator/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,29 @@
"""

from .cleanup import CleanupOp
from .coordinator import BackupOp, Coordinator, DeltaBackupOp, RestoreOp
from .coordinator import BackupOp, Coordinator, CoordinatorOp, DeltaBackupOp, RestoreOp
from .list import CachedListEntries, list_backups, list_delta_backups
from .lockops import LockOps
from .state import CachedListResponse
from astacus import config
from astacus.common import ipc
from astacus.common.magic import StrEnum
from astacus.common.msgspec_glue import register_msgspec_glue, StructResponse
from astacus.common.op import Op
from astacus.common.progress import Progress
from astacus.config import APP_HASH_KEY, get_config_content_and_hash
from astacus.starlette import get_query_param, Router
from asyncio import to_thread
from collections.abc import Sequence
from fastapi import APIRouter, Body, Depends, HTTPException, Request
from typing import Annotated
from starlette.background import BackgroundTasks
from starlette.exceptions import HTTPException
from starlette.requests import Request
from urllib.parse import urljoin

import logging
import msgspec
import os
import time

register_msgspec_glue()
router = APIRouter()
router = Router()

logger = logging.getLogger(__name__)

Expand All @@ -52,7 +52,7 @@ async def root():


@router.post("/config/reload")
async def config_reload(*, request: Request, c: Coordinator = Depends()):
async def config_reload(*, request: Request) -> dict:
"""Reload astacus configuration"""
config_path = os.environ.get("ASTACUS_CONFIG")
assert config_path is not None
Expand All @@ -61,7 +61,7 @@ async def config_reload(*, request: Request, c: Coordinator = Depends()):


@router.get("/config/status")
async def config_status(*, request: Request):
async def config_status(*, request: Request) -> dict:
config_path = os.environ.get("ASTACUS_CONFIG")
assert config_path is not None
_, config_hash = get_config_content_and_hash(config_path)
Expand All @@ -70,53 +70,51 @@ async def config_status(*, request: Request):


@router.post("/lock")
async def lock(*, locker: str, c: Coordinator = Depends(), op: LockOps = Depends()):
async def lock(*, request: Request, background_tasks: BackgroundTasks) -> LockStartResult:
c = await Coordinator.create_from_request(request, background_tasks)
locker = get_query_param(request, "locker")
op = c.create_op(LockOps, locker=locker)
result = c.start_op(op_name=OpName.lock, op=op, fun=op.lock)
return LockStartResult(unlock_url=urljoin(str(c.request_url), f"../unlock?locker={locker}"), **result.dict())


@router.post("/unlock")
def unlock(*, locker: str, c: Coordinator = Depends(), op: LockOps = Depends()):
async def unlock(*, request: Request, background_tasks: BackgroundTasks) -> Op.StartResult:
c = await Coordinator.create_from_request(request, background_tasks)
locker = get_query_param(request, "locker")
op = c.create_op(LockOps, locker=locker)
return c.start_op(op_name=OpName.unlock, op=op, fun=op.unlock)


@router.post("/backup")
async def backup(*, c: Coordinator = Depends(), op: BackupOp = Depends(BackupOp.create)):
async def backup(*, request: Request, background_tasks: BackgroundTasks) -> Op.StartResult:
c = await Coordinator.create_from_request(request, background_tasks)
op = c.create_op(BackupOp)
runner = await op.acquire_cluster_lock()
return c.start_op(op_name=OpName.backup, op=op, fun=runner)


@router.post("/delta/backup")
async def delta_backup(*, c: Coordinator = Depends(), op: DeltaBackupOp = Depends(DeltaBackupOp.create)):
async def delta_backup(*, request: Request, background_tasks: BackgroundTasks) -> Op.StartResult:
c = await Coordinator.create_from_request(request, background_tasks)
op = c.create_op(DeltaBackupOp)
runner = await op.acquire_cluster_lock()
return c.start_op(op_name=OpName.backup, op=op, fun=runner)


@router.post("/restore")
async def restore(
*,
c: Coordinator = Depends(),
storage: Annotated[str, Body()] = "",
name: Annotated[str, Body()] = "",
partial_restore_nodes: Annotated[Sequence[ipc.PartialRestoreRequestNode] | None, Body()] = None,
stop_after_step: Annotated[str | None, Body()] = None,
):
req = ipc.RestoreRequest(
storage=storage,
name=name,
partial_restore_nodes=partial_restore_nodes,
stop_after_step=stop_after_step,
)
op = RestoreOp(c=c, req=req)
async def restore(*, body: ipc.RestoreRequest, request: Request, background_tasks: BackgroundTasks) -> Op.StartResult:
c = await Coordinator.create_from_request(request, background_tasks)
op = RestoreOp(c=c, req=body)
runner = await op.acquire_cluster_lock()
return c.start_op(op_name=OpName.restore, op=op, fun=runner)


@router.get("/list")
async def _list_backups(
*, storage: Annotated[str, Body()] = "", c: Coordinator = Depends(), request: Request
) -> StructResponse:
req = ipc.ListRequest(storage=storage)
*, body: ipc.ListRequest = ipc.ListRequest(), request: Request, background_tasks: BackgroundTasks
) -> ipc.ListResponse:
c = await Coordinator.create_from_request(request, background_tasks)
coordinator_config = c.config
cached_list_response = c.state.cached_list_response
if cached_list_response is not None:
Expand All @@ -126,7 +124,7 @@ async def _list_backups(
and cached_list_response.coordinator_config == coordinator_config
and cached_list_response.list_request
):
return StructResponse(cached_list_response.list_response)
return cached_list_response.list_response
if c.state.cached_list_running:
raise HTTPException(status_code=429, detail="Already caching list result")
c.state.cached_list_running = True
Expand All @@ -136,15 +134,15 @@ async def _list_backups(
if cached_list_response is not None
else {}
)
list_response = await to_thread(list_backups, req=req, storage_factory=c.storage_factory, cache=cache)
list_response = await to_thread(list_backups, req=body, storage_factory=c.storage_factory, cache=cache)
c.state.cached_list_response = CachedListResponse(
coordinator_config=coordinator_config,
list_request=req,
list_request=body,
list_response=list_response,
)
finally:
c.state.cached_list_running = False
return StructResponse(list_response)
return list_response


def get_cache_entries_from_list_response(list_response: ipc.ListResponse) -> CachedListEntries:
Expand All @@ -155,40 +153,50 @@ def get_cache_entries_from_list_response(list_response: ipc.ListResponse) -> Cac


@router.get("/delta/list")
async def _list_delta_backups(*, storage: Annotated[str, Body()] = "", c: Coordinator = Depends(), request: Request):
req = ipc.ListRequest(storage=storage)
async def _list_delta_backups(
*, body: ipc.ListRequest, request: Request, background_tasks: BackgroundTasks
) -> ipc.ListResponse:
c = await Coordinator.create_from_request(request, background_tasks)
# This is not supposed to be called very often, no caching necessary
return await to_thread(list_delta_backups, req=req, storage_factory=c.storage_factory)
return await to_thread(list_delta_backups, req=body, storage_factory=c.storage_factory)


@router.post("/cleanup")
async def cleanup(
*,
storage: Annotated[str, Body()] = "",
retention: Annotated[ipc.Retention | None, Body()] = None,
explicit_delete: Annotated[Sequence[str], Body()] = (),
c: Coordinator = Depends(),
):
req = ipc.CleanupRequest(storage=storage, retention=retention, explicit_delete=list(explicit_delete))
op = CleanupOp(c=c, req=req)
*, request: Request, background_tasks: BackgroundTasks, body: ipc.CleanupRequest = ipc.CleanupRequest()
) -> Op.StartResult:
c = await Coordinator.create_from_request(request, background_tasks)
op = CleanupOp(c=c, req=body)
runner = await op.acquire_cluster_lock()
return c.start_op(op_name=OpName.cleanup, op=op, fun=runner)


@router.get("/{op_name}/{op_id}")
@router.get("/delta/{op_name}/{op_id}")
def op_status(*, op_name: OpName, op_id: int, c: Coordinator = Depends()):
class OpStatusResult(msgspec.Struct, kw_only=True):
state: Op.Status | None
progress: Progress | None


@router.get("/{op_name:str}/{op_id:int}")
@router.get("/delta/{op_name:str}/{op_id:int}")
async def op_status(*, request: Request, background_tasks: BackgroundTasks) -> OpStatusResult:
c = await Coordinator.create_from_request(request, background_tasks)
op_name = OpName(request.path_params["op_name"])
op_id: int = request.path_params["op_id"]
op, op_info = c.get_op_and_op_info(op_id=op_id, op_name=op_name)
result = {"state": op_info.op_status}
if isinstance(op, (BackupOp, DeltaBackupOp, RestoreOp)):
result["progress"] = msgspec.to_builtins(op.progress)
result = OpStatusResult(state=op_info.op_status, progress=None)
if isinstance(op, BackupOp | DeltaBackupOp | RestoreOp):
result.progress = op.progress
return result


@router.put("/{op_name}/{op_id}/sub-result")
@router.put("/delta/{op_name}/{op_id}/sub-result")
async def op_sub_result(*, op_name: OpName, op_id: int, c: Coordinator = Depends()):
@router.put("/{op_name:str}/{op_id:int}/sub-result")
@router.put("/delta/{op_name:str}/{op_id:int}/sub-result")
async def op_sub_result(*, request: Request, background_tasks: BackgroundTasks) -> None:
c = await Coordinator.create_from_request(request, background_tasks)
op_name = OpName(request.path_params["op_name"])
op_id: int = request.path_params["op_id"]
op, _ = c.get_op_and_op_info(op_id=op_id, op_name=op_name)
assert isinstance(op, CoordinatorOp)
# We used to have results available here, but not use those
# that was wasting a lot of memory by generating the same result twice.
if not op.subresult_sleeper:
Expand All @@ -197,5 +205,6 @@ async def op_sub_result(*, op_name: OpName, op_id: int, c: Coordinator = Depends


@router.get("/busy")
async def is_busy(*, c: Coordinator = Depends()) -> bool:
async def is_busy(*, request: Request, background_tasks: BackgroundTasks) -> bool:
c = await Coordinator.create_from_request(request, background_tasks)
return c.is_busy()
3 changes: 1 addition & 2 deletions astacus/coordinator/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

from astacus.common import ipc
from astacus.coordinator.coordinator import Coordinator, SteppedCoordinatorOp
from fastapi import Depends

import logging

Expand All @@ -17,7 +16,7 @@

class CleanupOp(SteppedCoordinatorOp):
@staticmethod
async def create(*, c: Coordinator = Depends(), req: ipc.CleanupRequest = ipc.CleanupRequest()) -> "CleanupOp":
async def create(*, c: Coordinator, req: ipc.CleanupRequest = ipc.CleanupRequest()) -> "CleanupOp":
return CleanupOp(c=c, req=req)

def __init__(self, *, c: Coordinator, req: ipc.CleanupRequest) -> None:
Expand Down
2 changes: 1 addition & 1 deletion astacus/coordinator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from astacus.common.statsd import StatsdConfig
from astacus.common.utils import AstacusModel
from collections.abc import Sequence
from fastapi import Request
from pathlib import Path
from starlette.requests import Request

APP_KEY = "coordinator_config"

Expand Down
Loading

0 comments on commit 0caf16f

Please sign in to comment.