Skip to content

Commit

Permalink
Use Psycopg3 COPY
Browse files Browse the repository at this point in the history
  • Loading branch information
SpaceCondor committed Oct 1, 2024
1 parent fb2888f commit 2354f39
Show file tree
Hide file tree
Showing 9 changed files with 488 additions and 348 deletions.
56 changes: 28 additions & 28 deletions README.md

Large diffs are not rendered by default.

672 changes: 383 additions & 289 deletions poetry.lock

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ packages = [

[tool.poetry.dependencies]
python = ">=3.8"
faker = {version = "~=30.0", optional = true}
psycopg2-binary = "2.9.9"
faker = {version = "~=29.0", optional = true}
sqlalchemy = "~=2.0"
sshtunnel = "0.4.0"
psycopg = "^3.2.3"
psycopg-binary = "^3.2.3"

[tool.poetry.dependencies.singer-sdk]
version = "~=0.40.0a1"
Expand Down Expand Up @@ -109,4 +110,4 @@ banned-from = ["sqlalchemy"]
sqlalchemy = "sa"

[tool.ruff.lint.pydocstyle]
convention = "google"
convention = "google"
69 changes: 54 additions & 15 deletions target_postgres/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,25 @@ def generate_temp_table_name(self):
# in postgres, used a guid just in case we are using the same session
return f"{str(uuid.uuid4()).replace('-', '_')}"

def generate_copy_statement(
self,
full_table_name: str | FullyQualifiedName,
columns: list[sa.Column], # type: ignore[override]
) -> str:
"""Generate a copy statement for bulk copy.
Args:
full_table_name: the target table name.
columns: the target table columns.
Returns:
A copy statement.
"""
columns_list = ", ".join(f'"{column.name}"' for column in columns)
sql: str = f'COPY "{full_table_name}" ({columns_list}) FROM STDIN'

return sql

def bulk_insert_records( # type: ignore[override]
self,
table: sa.Table,
Expand All @@ -145,35 +164,55 @@ def bulk_insert_records( # type: ignore[override]
True if table exists, False if not, None if unsure or undetectable.
"""
columns = self.column_representation(schema)
insert: str = t.cast(
str,
self.generate_insert_statement(
table.name,
columns,
),
)
self.logger.info("Inserting with SQL: %s", insert)
# Only one record per PK, we want to take the last one
data_to_insert: list[dict[str, t.Any]] = []
copy_statement: str = self.generate_copy_statement(table.name, columns)
self.logger.info("Inserting with SQL: %s", copy_statement)

data_to_copy: list[dict[str, t.Any]] = []

# If append only is False, we only take the latest record one per primary key
if self.append_only is False:
insert_records: dict[tuple, dict] = {} # pk tuple: record
unique_copy_records: dict[tuple, dict] = {} # pk tuple: values
for record in records:
insert_record = {
column.name: record.get(column.name) for column in columns
}
# No need to check for a KeyError here because the SDK already
# guarantees that all key properties exist in the record.
primary_key_tuple = tuple(record[key] for key in primary_keys)
insert_records[primary_key_tuple] = insert_record
data_to_insert = list(insert_records.values())
unique_copy_records[primary_key_tuple] = insert_record
data_to_copy = list(unique_copy_records.values())
else:
for record in records:
insert_record = {
column.name: record.get(column.name) for column in columns
}
data_to_insert.append(insert_record)
connection.execute(insert, data_to_insert)
data_to_copy.append(insert_record)

# Prepare to process the rows into csv. Use each column's bind_processor to do
# most of the work, then do the final construction of the csv rows ourselves
# to control exactly how values are converted and which ones are quoted.
column_bind_processors = {
column.name: column.type.bind_processor(connection.dialect)
for column in columns
}

# Use copy to run the copy statement.
# https://www.psycopg.org/psycopg3/docs/basic/copy.html
with connection.connection.cursor().copy(copy_statement) as copy: # type: ignore[attr-defined]
for row in data_to_copy:
processed_row = []
for row_column_name in row:
if column_bind_processors[row_column_name] is not None:
processed_row.append(
column_bind_processors[row_column_name](
row[row_column_name]
)
)
else:
processed_row.append(row[row_column_name])

copy.write_row(processed_row)

return True

def upsert(
Expand Down
2 changes: 1 addition & 1 deletion target_postgres/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def __init__(
th.Property(
"dialect+driver",
th.StringType,
default="postgresql+psycopg2",
default="postgresql+psycopg",
description=(
"Dialect+driver see "
+ "https://docs.sqlalchemy.org/en/20/core/engines.html. "
Expand Down
6 changes: 3 additions & 3 deletions target_postgres/tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

def postgres_config():
return {
"dialect+driver": "postgresql+psycopg2",
"dialect+driver": "postgresql+psycopg",
"host": "localhost",
"user": "postgres",
"password": "postgres",
Expand All @@ -29,7 +29,7 @@ def postgres_config():

def postgres_config_no_ssl():
return {
"dialect+driver": "postgresql+psycopg2",
"dialect+driver": "postgresql+psycopg",
"host": "localhost",
"user": "postgres",
"password": "postgres",
Expand All @@ -43,7 +43,7 @@ def postgres_config_no_ssl():

def postgres_config_ssh_tunnel():
return {
"sqlalchemy_url": "postgresql://postgres:[email protected]:5432/main",
"sqlalchemy_url": "postgresql+psycopg://postgres:[email protected]:5432/main",
"ssh_tunnel": {
"enable": True,
"host": "127.0.0.1",
Expand Down
8 changes: 3 additions & 5 deletions target_postgres/tests/test_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,18 @@
TargetCamelcaseTest,
TargetCliPrintsTest,
TargetDuplicateRecords,
TargetEncodedStringData,
TargetInvalidSchemaTest,
TargetMultipleStateMessages,
TargetNoPrimaryKeys,
TargetOptionalAttributes,
TargetRecordBeforeSchemaTest,
TargetRecordMissingKeyProperty,
TargetRecordMissingOptionalFields,
TargetRecordMissingRequiredProperty,
TargetSchemaNoProperties,
TargetSchemaUpdates,
TargetSpecialCharsInAttributes,
)

from target_postgres.target import TargetPostgres

from .core import create_engine, postgres_config

target_tests = TestSuite(
Expand Down Expand Up @@ -62,7 +58,9 @@ class BasePostgresSDKTests:
@pytest.fixture()
def connection(self, runner):
engine = create_engine(runner)
return engine.connect()
with engine.connect() as conn:
yield conn
engine.dispose()


SDKTests = get_target_test_class(
Expand Down
14 changes: 11 additions & 3 deletions target_postgres/tests/test_target_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def verify_data(
sqlalchemy.text(f"SELECT COUNT(*) FROM {full_table_name}")
)
assert result.first()[0] == number_of_rows
engine.dispose()


def test_sqlalchemy_url_config(postgres_config_no_ssl):
Expand All @@ -157,7 +158,7 @@ def test_sqlalchemy_url_config(postgres_config_no_ssl):
port = postgres_config_no_ssl["port"]

config = {
"sqlalchemy_url": f"postgresql://{user}:{password}@{host}:{port}/{database}"
"sqlalchemy_url": f"postgresql+psycopg://{user}:{password}@{host}:{port}/{database}"
}
tap = SampleTapCountries(config={}, state=None)
target = TargetPostgres(config=config)
Expand All @@ -167,7 +168,7 @@ def test_sqlalchemy_url_config(postgres_config_no_ssl):
def test_port_default_config():
"""Test that the default config is passed into the engine when the config doesn't provide it"""
config = {
"dialect+driver": "postgresql+psycopg2",
"dialect+driver": "postgresql+psycopg",
"host": "localhost",
"user": "postgres",
"password": "postgres",
Expand All @@ -186,12 +187,13 @@ def test_port_default_config():
engine.url.render_as_string(hide_password=False)
== f"{dialect_driver}://{user}:{password}@{host}:5432/{database}"
)
engine.dispose()


def test_port_config():
"""Test that the port config works"""
config = {
"dialect+driver": "postgresql+psycopg2",
"dialect+driver": "postgresql+psycopg",
"host": "localhost",
"user": "postgres",
"password": "postgres",
Expand All @@ -211,6 +213,7 @@ def test_port_config():
engine.url.render_as_string(hide_password=False)
== f"{dialect_driver}://{user}:{password}@{host}:5433/{database}"
)
engine.dispose()


# Test name would work well
Expand Down Expand Up @@ -402,6 +405,7 @@ def test_no_primary_keys(postgres_target):
singer_file_to_target(file_name, postgres_target)

verify_data(postgres_target, table_name, 16)
engine.dispose()


def test_no_type(postgres_target):
Expand Down Expand Up @@ -511,6 +515,7 @@ def test_anyof(postgres_target):
# {"anyOf":[{"type":"string"},{"type":"integer"},{"type":"null"}]}
if column.name == "legacy_id":
assert isinstance(column.type, TEXT)
engine.dispose()


def test_new_array_column(postgres_target):
Expand Down Expand Up @@ -621,6 +626,7 @@ def test_activate_version_hard_delete(postgres_config_no_ssl):
assert result.rowcount == 9

singer_file_to_target(file_name, pg_hard_delete_true)
engine.dispose()

# Should remove the 2 records we added manually
with engine.connect() as connection:
Expand Down Expand Up @@ -692,6 +698,7 @@ def test_activate_version_soft_delete(postgres_config_no_ssl):
# South America row should not have been modified, but it would have been prior
# to the fix mentioned in #204 and implemented in #240.
assert south_america == result.first()._asdict()
engine.dispose()


def test_activate_version_no_metadata(postgres_config_no_ssl):
Expand Down Expand Up @@ -742,6 +749,7 @@ def test_activate_version_deletes_data_properly(postgres_target):
with engine.connect() as connection:
result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}"))
assert result.rowcount == 0
engine.dispose()


def test_reserved_keywords(postgres_target):
Expand Down
2 changes: 1 addition & 1 deletion target_postgres/tests/test_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def connector():
"""Create a PostgresConnector instance."""
return PostgresConnector(
config={
"dialect+driver": "postgresql+psycopg2",
"dialect+driver": "postgresql+psycopg",
"host": "localhost",
"port": "5432",
"user": "postgres",
Expand Down

0 comments on commit 2354f39

Please sign in to comment.