Skip to content

Commit

Permalink
Add support for container types ARRAY, OBJECT, and FLOAT_VECTOR
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Dec 15, 2023
1 parent b8e5da3 commit 59f7b71
Show file tree
Hide file tree
Showing 9 changed files with 328 additions and 32 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ jobs:
pip install "setuptools>=64" --upgrade
# Install package in editable mode.
pip install --use-pep517 --prefer-binary --editable=.[test,develop]
pip install --use-pep517 --prefer-binary --editable=.[all,develop,test]
- name: Run linter and software tests
run: |
Expand Down
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog for Meltano/Singer Target for CrateDB

## In progress
- Add support for container types `ARRAY`, `OBJECT`, and `FLOAT_VECTOR`.

## 2023-12-08 v0.0.1
- Make it work. It can run the canonical Meltano GitHub -> DB example.
8 changes: 6 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,11 @@ dynamic = [
dependencies = [
"crate[sqlalchemy]",
"cratedb-toolkit",
'importlib-resources; python_version < "3.9"',
"meltanolabs-target-postgres==0.0.9",
'importlib-resources; python_version < "3.9"', # "meltanolabs-target-postgres==0.0.9",
"meltanolabs-target-postgres@ git+https://github.com/singer-contrib/meltanolabs-target-postgres.git@pgvector",
]
[project.optional-dependencies]
all = ["meltano-target-cratedb[vector]"]
develop = [
"black<24",
"mypy==1.7.1",
Expand All @@ -115,6 +116,9 @@ test = [
"pytest-cov<5",
"pytest-mock<4",
]
vector = [
"numpy",
]
[project.urls]
changelog = "https://github.com/crate-workbench/meltano-target-cratedb/blob/main/CHANGES.md"
documentation = "https://github.com/crate-workbench/meltano-target-cratedb"
Expand Down
2 changes: 1 addition & 1 deletion target_cratedb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Init CrateDB."""
from target_cratedb.patch import patch_sqlalchemy
from target_cratedb.sqlalchemy.patch import patch_sqlalchemy

patch_sqlalchemy()
74 changes: 70 additions & 4 deletions target_cratedb/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@
from datetime import datetime

import sqlalchemy
import sqlalchemy as sa
from crate.client.sqlalchemy.types import ObjectType, ObjectTypeImpl, _ObjectArray
from singer_sdk import typing as th
from sqlalchemy.dialects.postgresql import ARRAY, BIGINT
from singer_sdk.helpers._typing import is_array_type, is_boolean_type, is_integer_type, is_number_type, is_object_type
from sqlalchemy.types import (
ARRAY,
BIGINT,
BOOLEAN,
DATE,
DATETIME,
DECIMAL,
FLOAT,
INTEGER,
TEXT,
TIME,
Expand All @@ -22,7 +26,7 @@
)
from target_postgres.connector import NOTYPE, PostgresConnector

from target_cratedb.patch import polyfill_refresh_after_dml_engine
from target_cratedb.sqlalchemy.patch import polyfill_refresh_after_dml_engine


class CrateDBConnector(PostgresConnector):
Expand Down Expand Up @@ -111,8 +115,54 @@ def pick_individual_type(jsonschema_type: dict):
if "object" in jsonschema_type["type"]:
return ObjectType
if "array" in jsonschema_type["type"]:
# TODO: Handle other inner-types as well?
# Select between different kinds of `ARRAY` data types.
#
# This currently leverages an unspecified definition for the Singer SCHEMA,
# using the `additionalProperties` attribute to convey additional type
# information, agnostic of the target database.
#
# In this case, it is about telling different kinds of `ARRAY` types apart:
# Either it is a vanilla `ARRAY`, to be stored into a `jsonb[]` type, or,
# alternatively, it can be a "vector" kind `ARRAY` of floating point
# numbers, effectively what pgvector is storing in its `VECTOR` type.
#
# Still, `type: "vector"` is only a surrogate label here, because other
# database systems may use different types for implementing the same thing,
# and need to translate accordingly.
"""
Schema override rule in `meltano.yml`:
type: "array"
items:
type: "number"
additionalProperties:
storage:
type: "vector"
dim: 4
Produced schema annotation in `catalog.json`:
{"type": "array",
"items": {"type": "number"},
"additionalProperties": {"storage": {"type": "vector", "dim": 4}}}
"""
if "additionalProperties" in jsonschema_type and "storage" in jsonschema_type["additionalProperties"]:
storage_properties = jsonschema_type["additionalProperties"]["storage"]
if "type" in storage_properties and storage_properties["type"] == "vector":
# On PostgreSQL/pgvector, use the corresponding type definition
# from its SQLAlchemy dialect.
from target_cratedb.sqlalchemy.vector import FloatVector

return FloatVector(storage_properties["dim"])

# Discover/translate inner types.
inner_type = resolve_array_inner_type(jsonschema_type)
if inner_type is not None:
return ARRAY(inner_type)

# When type discovery fails, assume `TEXT`.
return ARRAY(TEXT())

if jsonschema_type.get("format") == "date-time":
return TIMESTAMP()
individual_type = th.to_sql_type(jsonschema_type)
Expand All @@ -139,6 +189,7 @@ def pick_best_sql_type(sql_type_array: list):
DATE,
TIME,
DECIMAL,
FLOAT,
BIGINT,
INTEGER,
BOOLEAN,
Expand All @@ -151,7 +202,7 @@ def pick_best_sql_type(sql_type_array: list):
for obj in sql_type_array:
# FIXME: Workaround. Currently, ObjectType can not be resolved back to a type?
# TypeError: isinstance() arg 2 must be a type, a tuple of types, or a union
if isinstance(sql_type, ObjectTypeImpl):
if isinstance(obj, ObjectTypeImpl):
return ObjectType
if isinstance(obj, sql_type):
return obj
Expand Down Expand Up @@ -245,3 +296,18 @@ def prepare_schema(self, schema_name: str) -> None:
Don't emit `CREATE SCHEMA` statements to CrateDB.
"""
pass


def resolve_array_inner_type(jsonschema_type: dict) -> t.Union[sa.types.TypeEngine, None]:
if "items" in jsonschema_type:
if is_boolean_type(jsonschema_type["items"]):
return BOOLEAN()
if is_number_type(jsonschema_type["items"]):
return FLOAT()
if is_integer_type(jsonschema_type["items"]):
return BIGINT()
if is_object_type(jsonschema_type["items"]):
return ObjectType()
if is_array_type(jsonschema_type["items"]):
return resolve_array_inner_type(jsonschema_type["items"]["type"])
return None
Empty file.
38 changes: 35 additions & 3 deletions target_cratedb/patch.py → target_cratedb/sqlalchemy/patch.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,36 @@
from datetime import datetime

import sqlalchemy as sa
from crate.client.sqlalchemy.dialect import TYPES_MAP, DateTime
from _decimal import Decimal
from crate.client.http import CrateJsonEncoder
from crate.client.sqlalchemy.dialect import ARRAY, TYPES_MAP, DateTime
from crate.client.sqlalchemy.types import _ObjectArray
from sqlalchemy.sql import sqltypes


def patch_sqlalchemy():
patch_types()
patch_json_encoder()


def patch_types():
"""
Register missing timestamp data type.
Register missing data types, and fix erroneous ones.
TODO: Upstream to crate-python.
"""
# TODO: Submit patch to `crate-python`.
TYPES_MAP["bigint"] = sqltypes.BIGINT
TYPES_MAP["bigint_array"] = ARRAY(sqltypes.BIGINT)
TYPES_MAP["long"] = sqltypes.BIGINT
TYPES_MAP["long_array"] = ARRAY(sqltypes.BIGINT)
TYPES_MAP["real"] = sqltypes.DOUBLE
TYPES_MAP["real_array"] = ARRAY(sqltypes.DOUBLE)
TYPES_MAP["timestamp without time zone"] = sqltypes.TIMESTAMP
TYPES_MAP["timestamp with time zone"] = sqltypes.TIMESTAMP

# TODO: Can `ARRAY` be inherited from PostgreSQL's
# `ARRAY`, to make type checking work?

def as_generic(self):
return sqltypes.ARRAY

Expand All @@ -36,6 +51,23 @@ def process(value):
DateTime.bind_processor = bind_processor


def patch_json_encoder():
"""
`Decimal` types have been rendered as strings.
TODO: Upstream to crate-python.
"""

json_encoder_default = CrateJsonEncoder.default

def default(self, o):
if isinstance(o, Decimal):
return float(o)
return json_encoder_default(o)

CrateJsonEncoder.default = default


def polyfill_refresh_after_dml_engine(engine: sa.Engine):
def receive_after_execute(
conn: sa.engine.Connection, clauseelement, multiparams, params, execution_options, result
Expand Down
82 changes: 82 additions & 0 deletions target_cratedb/sqlalchemy/vector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# TODO: Refactor to CrateDB SQLAlchemy dialect.
import typing as t

import numpy as np
import numpy.typing as npt
import sqlalchemy as sa
from sqlalchemy.types import UserDefinedType

__all__ = ["FloatVector"]


def from_db(value: t.Iterable) -> t.Optional[npt.ArrayLike]:
# from `pgvector.utils`
# could be ndarray if already cast by lower-level driver
if value is None or isinstance(value, np.ndarray):
return value

return np.array(value, dtype=np.float32)


def to_db(value: t.Any, dim: t.Optional[int] = None) -> t.Optional[t.List]:
# from `pgvector.utils`
if value is None:
return value

if isinstance(value, np.ndarray):
if value.ndim != 1:
raise ValueError("expected ndim to be 1")

if not np.issubdtype(value.dtype, np.integer) and not np.issubdtype(value.dtype, np.floating):
raise ValueError("dtype must be numeric")

value = value.tolist()

if dim is not None and len(value) != dim:
raise ValueError("expected %d dimensions, not %d" % (dim, len(value)))

return value


class FloatVector(UserDefinedType):
"""
https://crate.io/docs/crate/reference/en/master/general/ddl/data-types.html#float-vector
https://crate.io/docs/crate/reference/en/master/general/builtins/scalar-functions.html#scalar-knn-match
"""

cache_ok = True

def __init__(self, dim: t.Optional[int] = None) -> None:
super(UserDefinedType, self).__init__()
self.dim = dim

def get_col_spec(self, **kw: t.Any) -> str:
if self.dim is None:
return "FLOAT_VECTOR"
return "FLOAT_VECTOR(%d)" % self.dim

def bind_processor(self, dialect: sa.Dialect) -> t.Callable:
def process(value: t.Iterable) -> t.Optional[t.List]:
return to_db(value, self.dim)

return process

def result_processor(self, dialect: sa.Dialect, coltype: t.Any) -> t.Callable:
def process(value: t.Any) -> t.Optional[npt.ArrayLike]:
return from_db(value)

return process

"""
CrateDB currently only supports similarity function `VectorSimilarityFunction.EUCLIDEAN`.
-- https://github.com/crate/crate/blob/1ca5c6dbb2/server/src/main/java/io/crate/types/FloatVectorType.java#L55
On the other hand, pgvector use a comparator to apply different similarity functions as operators,
see `pgvector.sqlalchemy.Vector.comparator_factory`.
<->: l2/euclidean_distance
<#>: max_inner_product
<=>: cosine_distance
TODO: Discuss.
""" # noqa: E501
Loading

0 comments on commit 59f7b71

Please sign in to comment.