Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use Psycopg3 COPY #451

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 54 additions & 15 deletions target_postgres/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,25 @@ def generate_temp_table_name(self):
# in postgres, used a guid just in case we are using the same session
return f"{str(uuid.uuid4()).replace('-', '_')}"

def generate_copy_statement(
self,
full_table_name: str | FullyQualifiedName,
columns: list[sa.Column], # type: ignore[override]
) -> str:
"""Generate a copy statement for bulk copy.

Args:
full_table_name: the target table name.
columns: the target table columns.

Returns:
A copy statement.
"""
columns_list = ", ".join(f'"{column.name}"' for column in columns)
sql: str = f'COPY "{full_table_name}" ({columns_list}) FROM STDIN'

return sql

def bulk_insert_records( # type: ignore[override]
self,
table: sa.Table,
Expand All @@ -145,35 +164,55 @@ def bulk_insert_records( # type: ignore[override]
True if table exists, False if not, None if unsure or undetectable.
"""
columns = self.column_representation(schema)
insert: str = t.cast(
str,
self.generate_insert_statement(
table.name,
columns,
),
)
self.logger.info("Inserting with SQL: %s", insert)
# Only one record per PK, we want to take the last one
data_to_insert: list[dict[str, t.Any]] = []
copy_statement: str = self.generate_copy_statement(table.name, columns)
self.logger.info("Inserting with SQL: %s", copy_statement)

data_to_copy: list[dict[str, t.Any]] = []

# If append only is False, we only take the latest record one per primary key
if self.append_only is False:
insert_records: dict[tuple, dict] = {} # pk tuple: record
unique_copy_records: dict[tuple, dict] = {} # pk tuple: values
for record in records:
insert_record = {
column.name: record.get(column.name) for column in columns
}
# No need to check for a KeyError here because the SDK already
# guarantees that all key properties exist in the record.
primary_key_tuple = tuple(record[key] for key in primary_keys)
insert_records[primary_key_tuple] = insert_record
data_to_insert = list(insert_records.values())
unique_copy_records[primary_key_tuple] = insert_record
data_to_copy = list(unique_copy_records.values())
else:
for record in records:
insert_record = {
column.name: record.get(column.name) for column in columns
}
data_to_insert.append(insert_record)
connection.execute(insert, data_to_insert)
data_to_copy.append(insert_record)

# Prepare to process the rows into csv. Use each column's bind_processor to do
# most of the work, then do the final construction of the csv rows ourselves
# to control exactly how values are converted and which ones are quoted.
column_bind_processors = {
column.name: column.type.bind_processor(connection.dialect)
for column in columns
}

# Use copy to run the copy statement.
# https://www.psycopg.org/psycopg3/docs/basic/copy.html
with connection.connection.cursor().copy(copy_statement) as copy: # type: ignore[attr-defined]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens at this point if someone sets postgresql+psycopg2 for dialect+driver?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edgarrmondragon It would raise an exception. In the current main branch I don't think using anything aside from postgresql+psycopg2 would work anyway, so this being configurable doesn't add much.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure, but I don't think we use driver-specific APIs and rely on SQLAlchemy DDL/DML in all places, so I would expect most drivers to work. Maybe I'm wrong.

for row in data_to_copy:
processed_row = []
for row_column_name in row:
if column_bind_processors[row_column_name] is not None:
processed_row.append(
column_bind_processors[row_column_name](
row[row_column_name]
)
)
else:
processed_row.append(row[row_column_name])

copy.write_row(processed_row)

return True

def upsert(
Expand Down
4 changes: 0 additions & 4 deletions target_postgres/tests/test_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,18 @@
TargetCamelcaseTest,
TargetCliPrintsTest,
TargetDuplicateRecords,
TargetEncodedStringData,
TargetInvalidSchemaTest,
TargetMultipleStateMessages,
TargetNoPrimaryKeys,
TargetOptionalAttributes,
TargetRecordBeforeSchemaTest,
TargetRecordMissingKeyProperty,
TargetRecordMissingOptionalFields,
TargetRecordMissingRequiredProperty,
TargetSchemaNoProperties,
TargetSchemaUpdates,
TargetSpecialCharsInAttributes,
)

from target_postgres.target import TargetPostgres

from .core import create_engine, postgres_config

target_tests = TestSuite(
Expand Down
2 changes: 1 addition & 1 deletion target_postgres/tests/test_target_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def test_sqlalchemy_url_config(postgres_config_no_ssl):
port = postgres_config_no_ssl["port"]

config = {
"sqlalchemy_url": f"postgresql://{user}:{password}@{host}:{port}/{database}"
"sqlalchemy_url": f"postgresql+psycopg://{user}:{password}@{host}:{port}/{database}"
}
tap = SampleTapCountries(config={}, state=None)
target = TargetPostgres(config=config)
Expand Down