Skip to content

Commit

Permalink
Merge branch 'main' into edgarrmondragon/feat/records-batch-metric
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon authored Nov 26, 2024
2 parents 8d7da14 + 8c7c307 commit 59a8d34
Show file tree
Hide file tree
Showing 15 changed files with 498 additions and 430 deletions.
1 change: 1 addition & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ updates:
update-types:
- "minor"
- "patch"
versioning-strategy: increase-if-necessary
- package-ecosystem: pip
directory: "/.github/workflows"
schedule:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/cookiecutter-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
with:
python-version: 3.x

- uses: astral-sh/setup-uv@v3
- uses: astral-sh/setup-uv@v4
with:
version: ">=0.4.30"

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ jobs:
run: |
nox -r --no-install -- xml
- uses: codecov/codecov-action@v4
- uses: codecov/codecov-action@v5
with:
fail_ci_if_error: true
token: ${{ secrets.CODECOV_TOKEN }}
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ repos:
- id: check-readthedocs

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.7.3
rev: v0.8.0
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix, --show-fixes]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ updates:
dependency-type: production
update-types:
- "patch"
versioning-strategy: increase-if-necessary
- package-ecosystem: pip
directory: "/.github/workflows"
schedule:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ updates:
dependency-type: production
update-types:
- "patch"
versioning-strategy: increase-if-necessary
- package-ecosystem: pip
directory: "/.github/workflows"
schedule:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ updates:
dependency-type: production
update-types:
- "patch"
versioning-strategy: increase-if-necessary
- package-ecosystem: pip
directory: "/.github/workflows"
schedule:
Expand Down
2 changes: 1 addition & 1 deletion docs/implementation/catalog_metadata.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Catalog Metadata

The SDK automatically generates catalog metadata during catalog discovery. Selection rules overrided by a user will be respected.
The SDK automatically generates catalog metadata during catalog discovery. Selection rules overridden by a user will be respected.

Primary key properties may not be deselected, as these are required for `key_properties` to be declared in stream messages.

Expand Down
795 changes: 388 additions & 407 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 1 addition & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,6 @@ docstring-code-format = true
[tool.ruff.lint]
explicit-preview-rules = false
ignore = [
"ANN101", # Missing type annotation for `self` in method
"ANN102", # Missing type annotation for `cls` in class method
"N818", # Exception name should be named with an Error suffix
"COM812", # missing-trailing-comma
"ISC001", # single-line-implicit-string-concatenation
Expand Down Expand Up @@ -362,7 +360,7 @@ select = [
"SLF", # flake8-self
"SIM", # flake8-simplify
"TID", # flake8-tidy-imports
"TCH", # flake8-type-checking
"TC", # flake8-type-checking
"ARG", # flake8-unused-arguments
"PTH", # flake8-use-pathlib
"ERA", # eradicate
Expand Down
1 change: 1 addition & 0 deletions singer_sdk/_singerlib/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class StreamMetadata(Metadata):
"""Stream metadata."""

table_key_properties: t.Sequence[str] | None = None
replication_key: str | None = None
forced_replication_method: str | None = None
valid_replication_keys: list[str] | None = None
schema_name: str | None = None
Expand Down
22 changes: 18 additions & 4 deletions singer_sdk/connectors/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,15 @@ class JSONSchemaToSQL:
.. versionadded:: 0.42.0
"""

def __init__(self) -> None:
"""Initialize the mapper with default type mappings."""
def __init__(self, *, max_varchar_length: int | None = None) -> None:
"""Initialize the mapper with default type mappings.
Args:
max_varchar_length: The absolute maximum length for VARCHAR columns that
the database supports.
"""
self._max_varchar_length = max_varchar_length

# Default type mappings
self._type_mapping: dict[str, JSONtoSQLHandler] = {
"string": self._handle_string_type,
Expand Down Expand Up @@ -311,7 +318,7 @@ def handle_multiple_types(self, types: t.Sequence[str]) -> sa.types.TypeEngine:
"""
return sa.types.VARCHAR()

def handle_raw_string(self, schema: dict) -> sa.types.TypeEngine: # noqa: PLR6301
def handle_raw_string(self, schema: dict) -> sa.types.TypeEngine:
"""Handle a string type generically.
Args:
Expand All @@ -321,6 +328,10 @@ def handle_raw_string(self, schema: dict) -> sa.types.TypeEngine: # noqa: PLR63
Appropriate SQLAlchemy type.
"""
max_length: int | None = schema.get("maxLength")

if max_length and self._max_varchar_length:
max_length = min(max_length, self._max_varchar_length)

return sa.types.VARCHAR(max_length)

def _get_type_from_schema(self, schema: dict) -> sa.types.TypeEngine | None:
Expand Down Expand Up @@ -439,6 +450,9 @@ class SQLConnector: # noqa: PLR0904
allow_temp_tables: bool = True # Whether temp tables are supported.
_cached_engine: Engine | None = None

#: The absolute maximum length for VARCHAR columns that the database supports.
max_varchar_length: int | None = None

def __init__(
self,
config: dict | None = None,
Expand Down Expand Up @@ -489,7 +503,7 @@ def jsonschema_to_sql(self) -> JSONSchemaToSQL:
.. versionadded:: 0.42.0
"""
return JSONSchemaToSQL()
return JSONSchemaToSQL(max_varchar_length=self.max_varchar_length)

@contextmanager
def _connect(self) -> t.Iterator[sa.engine.Connection]:
Expand Down
25 changes: 21 additions & 4 deletions singer_sdk/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
if t.TYPE_CHECKING:
import logging

from singer_sdk._singerlib.catalog import StreamMetadata
from singer_sdk.helpers import types
from singer_sdk.helpers._compat import Traversable
from singer_sdk.tap_base import Tap
Expand Down Expand Up @@ -1277,10 +1278,26 @@ def apply_catalog(self, catalog: singer.Catalog) -> None:

catalog_entry = catalog.get_stream(self.name)
if catalog_entry:
self.primary_keys = catalog_entry.key_properties
self.replication_key = catalog_entry.replication_key
if catalog_entry.replication_method:
self.forced_replication_method = catalog_entry.replication_method
stream_metadata: StreamMetadata | None
if stream_metadata := catalog_entry.metadata.get(()): # type: ignore[assignment]
table_key_properties = stream_metadata.table_key_properties
table_replication_key = stream_metadata.replication_key
table_replication_method = stream_metadata.forced_replication_method
else:
table_key_properties = None
table_replication_key = None
table_replication_method = None

self.primary_keys = catalog_entry.key_properties or table_key_properties
self.replication_key = (
catalog_entry.replication_key or table_replication_key
)

replication_method = (
catalog_entry.replication_method or table_replication_method
)
if replication_method:
self.forced_replication_method = replication_method

def _get_state_partition_context(
self,
Expand Down
32 changes: 24 additions & 8 deletions tests/core/test_connector_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ def my_type_to_jsonschema(self, column_type) -> dict: # noqa: ARG002
class TestJSONSchemaToSQL: # noqa: PLR0904
@pytest.fixture
def json_schema_to_sql(self) -> JSONSchemaToSQL:
return JSONSchemaToSQL()
return JSONSchemaToSQL(max_varchar_length=65_535)

def test_register_jsonschema_type_handler(
self,
Expand Down Expand Up @@ -509,14 +509,30 @@ def test_string(self, json_schema_to_sql: JSONSchemaToSQL):
assert isinstance(result, sa.types.VARCHAR)
assert result.length is None

def test_string_max_length(self, json_schema_to_sql: JSONSchemaToSQL):
jsonschema_type = {"type": ["string", "null"], "maxLength": 10}
@pytest.mark.parametrize(
"jsonschema_type,expected_length",
[
pytest.param(
{"type": ["string", "null"], "maxLength": 10},
10,
id="max-length",
),
pytest.param(
{"type": ["string", "null"], "maxLength": 1_000_000},
65_535,
id="max-length-clamped",
),
],
)
def test_string_max_length(
self,
json_schema_to_sql: JSONSchemaToSQL,
jsonschema_type: dict,
expected_length: int,
):
result = json_schema_to_sql.to_sql_type(jsonschema_type)
assert isinstance(
json_schema_to_sql.to_sql_type(jsonschema_type),
sa.types.VARCHAR,
)
assert result.length == 10
assert isinstance(result, sa.types.VARCHAR)
assert result.length == expected_length

def test_integer(self, json_schema_to_sql: JSONSchemaToSQL):
jsonschema_type = {"type": ["integer", "null"]}
Expand Down
37 changes: 37 additions & 0 deletions tests/core/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,43 @@ def test_stream_apply_catalog(stream: Stream):
assert stream.forced_replication_method == REPLICATION_FULL_TABLE


def test_stream_apply_catalog__singer_standard(stream: Stream):
"""Applying a catalog to a stream should overwrite fields."""
assert stream.primary_keys == []
assert stream.replication_key == "updatedAt"
assert stream.replication_method == REPLICATION_INCREMENTAL
assert stream.forced_replication_method is None

stream.apply_catalog(
catalog=Catalog.from_dict(
{
"streams": [
{
"tap_stream_id": stream.name,
"stream": stream.name,
"schema": stream.schema,
"metadata": [
{
"breadcrumb": [],
"metadata": {
"table-key-properties": ["id"],
"replication-key": "newReplicationKey",
"forced-replication-method": REPLICATION_FULL_TABLE,
},
},
],
},
],
},
),
)

assert stream.primary_keys == ["id"]
assert stream.replication_key == "newReplicationKey"
assert stream.replication_method == REPLICATION_FULL_TABLE
assert stream.forced_replication_method == REPLICATION_FULL_TABLE


@pytest.mark.parametrize(
"stream_name,forced_replication_method,bookmark_value,expected_starting_value",
[
Expand Down

0 comments on commit 59a8d34

Please sign in to comment.