Skip to content

Commit

Permalink
feat: Settings write-back
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed May 8, 2024
1 parent 9d0c08b commit f098bcf
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 20 deletions.
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@
# -- Options for intersphinx -----------------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/extensions/intersphinx.html#configuration
intersphinx_mapping = {
"blinker": ("https://blinker.readthedocs.io/en/stable/", None),
"requests": ("https://requests.readthedocs.io/en/latest/", None),
"python": ("https://docs.python.org/3/", None),
}
Expand Down
1 change: 1 addition & 0 deletions docs/guides/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ The following pages contain useful information for developers building on top of
porting
pagination-classes
custom-clis
signals
```
49 changes: 49 additions & 0 deletions docs/guides/signals.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Signals

This guide will show you how to use the built-in [Blinker](inv:blinker:std:doc#index) signals in the Singer SDK.

## Settings write-back

The SDK provides a signal that allows you to write back settings to the configuration file. This is useful if you want to update the configuration file with new settings that were set during the run, like a `refresh_token`.

```python
import requests
from singer_sdk.authenticators import OAuthAuthenticator
from singer_sdk.plugin_base import PluginBase


class RefreshTokenAuthenticator(OAuthAuthenticator):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.refresh_token = self.config["refresh_token"]

@property
def oauth_request_body(self):
return {
"client_id": self.config["client_id"],
"client_secret": self.config["client_secret"],
"grant_type": "refresh_token",
"refresh_token": self.refresh_token,
"user_type": "Location",
}

def update_access_token(self):
token_response = requests.post(
self.auth_endpoint,
headers=self._oauth_headers,
data=auth_request_payload,
timeout=60,
)
token_response.raise_for_status()
token_json = token_response.json()

self.access_token = token_json["access_token"]
self.refresh_token = token_json["refresh_token"]
PluginBase.config_updated.send(self, refresh_token=self.refresh_token)
```

In the example above, the `RefreshTokenAuthenticator` class is a subclass of `OAuthAuthenticator` that calls `PluginBase.config_updated.send` to send a signal to update the `refresh_token` in tap's configuration.

```{note}
Only when a single file is passed via the `--config` command line option, the SDK will write back the settings to the same file.
```
13 changes: 12 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ license = "Apache-2.0"
python = ">=3.8"
backoff = { version = ">=2.0.0", python = "<4" }
backports-datetime-fromisoformat = { version = ">=2.0.1", python = "<3.11" }
blinker = ">=1.7.0"
click = "~=8.0"
cryptography = ">=3.4.6"
fs = ">=2.4.16"
Expand Down
8 changes: 4 additions & 4 deletions singer_sdk/authenticators.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import typing as t
import warnings
from datetime import timedelta
from types import MappingProxyType
from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit

import requests
Expand All @@ -16,6 +15,7 @@

if t.TYPE_CHECKING:
import logging
from types import MappingProxyType

from pendulum import DateTime

Expand Down Expand Up @@ -90,19 +90,19 @@ def __init__(self, stream: RESTStream) -> None:
stream: A stream for a RESTful endpoint.
"""
self.tap_name: str = stream.tap_name
self._config: dict[str, t.Any] = dict(stream.config)
self._config = stream.config
self._auth_headers: dict[str, t.Any] = {}
self._auth_params: dict[str, t.Any] = {}
self.logger: logging.Logger = stream.logger

@property
def config(self) -> t.Mapping[str, t.Any]:
def config(self) -> MappingProxyType:
"""Get stream or tap config.
Returns:
A frozen (read-only) config dictionary map.
"""
return MappingProxyType(self._config)
return self._config

@property
def auth_headers(self) -> dict:
Expand Down
76 changes: 65 additions & 11 deletions singer_sdk/plugin_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import abc
import json
import logging
import os
import sys
Expand All @@ -13,6 +14,7 @@
from types import MappingProxyType

import click
from blinker import Signal
from jsonschema import Draft7Validator

from singer_sdk import about, metrics
Expand Down Expand Up @@ -98,6 +100,9 @@ class PluginBase(metaclass=abc.ABCMeta): # noqa: PLR0904

_config: dict

# Signals
config_updated = Signal()

@classproperty
def logger(cls) -> logging.Logger: # noqa: N805
"""Get logger.
Expand Down Expand Up @@ -134,10 +139,43 @@ def __init__(
it can be a predetermined config dict.
parse_env_config: True to parse settings from env vars.
validate_config: True to require validation of config settings.
"""
self._config, self._config_path = self._process_config(
config=config,
parse_env_config=parse_env_config,
)
metrics._setup_logging(self.config) # noqa: SLF001
self.metrics_logger = metrics.get_metrics_logger()

self._validate_config(raise_errors=validate_config)
self._mapper: PluginMapper | None = None

# Initialization timestamp
self.__initialized_at = int(time.time() * 1000)

self.config_updated.connect(self.update_config)

def _process_config(
self,
*,
config: dict | PurePath | str | list[PurePath | str] | None = None,
parse_env_config: bool = False,
) -> tuple[dict[str, t.Any], PurePath | str | None]:
"""Process the plugin configuration.
Args:
config: May be one or more paths, either as str or PurePath objects, or
it can be a predetermined config dict.
parse_env_config: True to parse settings from env vars.
Returns:
A tuple containing the config dictionary and the config write-back path.
Raises:
ValueError: If config is not a dict or path string.
"""
config_path = None

if not config:
config_dict = {}
elif isinstance(config, (str, PurePath)):
Expand All @@ -148,28 +186,29 @@ def __init__(
# Read each config file sequentially. Settings from files later in the
# list will override those of earlier ones.
config_dict.update(read_json_file(config_path))

if len(config) == 1 and not parse_env_config:
config_path = config[0]

elif isinstance(config, dict):
config_dict = config
else:
else: # pragma: no cover
msg = f"Error parsing config of type '{type(config).__name__}'."
raise ValueError(msg)

# Parse env var settings
if parse_env_config:
self.logger.info("Parsing env var for settings config...")
config_dict.update(self._env_var_config)
else:
self.logger.info("Skipping parse of env var settings...")

# Handle sensitive settings
for k, v in config_dict.items():
if self._is_secret_config(k):
config_dict[k] = SecretString(v)
self._config = config_dict
metrics._setup_logging(self.config) # noqa: SLF001
self.metrics_logger = metrics.get_metrics_logger()

self._validate_config(raise_errors=validate_config)
self._mapper: PluginMapper | None = None

# Initialization timestamp
self.__initialized_at = int(time.time() * 1000)
return config_dict, config_path

def setup_mapper(self) -> None:
"""Initialize the plugin mapper for this tap."""
Expand Down Expand Up @@ -336,13 +375,28 @@ def state(self) -> dict:
# Core plugin config:

@property
def config(self) -> t.Mapping[str, t.Any]:
def config(self) -> MappingProxyType:
"""Get config.
Returns:
A frozen (read-only) config dictionary map.
"""
return t.cast(dict, MappingProxyType(self._config))
return MappingProxyType(self._config)

def update_config(self, sender: t.Any, **settings: t.Any) -> None: # noqa: ANN401, ARG002
"""Update the config with new settings.
This is a :external+blinker:std:doc:`Blinker <index>` signal receiver.
Args:
sender: The sender of the signal.
**settings: New settings to update the config with.
"""
self._config.update(**settings)
if self._config_path is not None: # pragma: no cover
self.logger.info("Updating config file: %s", self._config_path)
with Path(self._config_path).open("w") as f:
json.dump(self._config, f)

@staticmethod
def _is_secret_config(config_key: str) -> bool:
Expand Down
7 changes: 3 additions & 4 deletions singer_sdk/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import typing as t
from os import PathLike
from pathlib import Path
from types import MappingProxyType

import pendulum

Expand Down Expand Up @@ -58,6 +57,7 @@

if t.TYPE_CHECKING:
import logging
from types import MappingProxyType

from singer_sdk.helpers._compat import Traversable
from singer_sdk.tap_base import Tap
Expand Down Expand Up @@ -135,7 +135,6 @@ def __init__(
self.logger: logging.Logger = tap.logger.getChild(self.name)
self.metrics_logger = tap.metrics_logger
self.tap_name: str = tap.name
self._config: dict = dict(tap.config)
self._tap = tap
self._tap_state = tap.state
self._tap_input_catalog: singer.Catalog | None = None
Expand Down Expand Up @@ -602,13 +601,13 @@ def _singer_catalog(self) -> singer.Catalog:
return singer.Catalog([(self.tap_stream_id, self._singer_catalog_entry)])

@property
def config(self) -> t.Mapping[str, t.Any]:
def config(self) -> MappingProxyType[str, t.Any]:
"""Get stream configuration.
Returns:
A frozen (read-only) config dictionary map.
"""
return MappingProxyType(self._config)
return self._tap.config

@property
def tap_stream_id(self) -> str:
Expand Down
8 changes: 8 additions & 0 deletions tests/core/test_plugin_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,11 @@ def test_mapper_not_initialized():
def test_supported_python_versions():
"""Test that supported python versions are correctly parsed."""
assert PluginBase._get_supported_python_versions(SDK_PACKAGE_NAME)


def test_config_updated_signal():
plugin = PluginTest(config={"prop1": "hello"})
assert plugin.config == {"prop1": "hello"}

PluginBase.config_updated.send(prop2="abc")
assert plugin.config == {"prop1": "hello", "prop2": "abc"}

0 comments on commit f098bcf

Please sign in to comment.