diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index f2f9e42..79f63b7 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -119,6 +119,25 @@ def generate_temp_table_name(self): # in postgres, used a guid just in case we are using the same session return f"{str(uuid.uuid4()).replace('-', '_')}" + def generate_copy_statement( + self, + full_table_name: str | FullyQualifiedName, + columns: list[sa.Column], # type: ignore[override] + ) -> str: + """Generate a copy statement for bulk copy. + + Args: + full_table_name: the target table name. + columns: the target table columns. + + Returns: + A copy statement. + """ + columns_list = ", ".join(f'"{column.name}"' for column in columns) + sql: str = f'COPY "{full_table_name}" ({columns_list}) FROM STDIN' + + return sql + def bulk_insert_records( # type: ignore[override] self, table: sa.Table, @@ -145,19 +164,14 @@ def bulk_insert_records( # type: ignore[override] True if table exists, False if not, None if unsure or undetectable. """ columns = self.column_representation(schema) - insert: str = t.cast( - str, - self.generate_insert_statement( - table.name, - columns, - ), - ) - self.logger.info("Inserting with SQL: %s", insert) - # Only one record per PK, we want to take the last one - data_to_insert: list[dict[str, t.Any]] = [] + copy_statement: str = self.generate_copy_statement(table.name, columns) + self.logger.info("Inserting with SQL: %s", copy_statement) + data_to_copy: list[dict[str, t.Any]] = [] + + # If append only is False, we only take the latest record one per primary key if self.append_only is False: - insert_records: dict[tuple, dict] = {} # pk tuple: record + unique_copy_records: dict[tuple, dict] = {} # pk tuple: values for record in records: insert_record = { column.name: record.get(column.name) for column in columns @@ -165,15 +179,40 @@ def bulk_insert_records( # type: ignore[override] # No need to check for a KeyError here because the SDK already # guarantees that all key properties exist in the record. primary_key_tuple = tuple(record[key] for key in primary_keys) - insert_records[primary_key_tuple] = insert_record - data_to_insert = list(insert_records.values()) + unique_copy_records[primary_key_tuple] = insert_record + data_to_copy = list(unique_copy_records.values()) else: for record in records: insert_record = { column.name: record.get(column.name) for column in columns } - data_to_insert.append(insert_record) - connection.execute(insert, data_to_insert) + data_to_copy.append(insert_record) + + # Prepare to process the rows into csv. Use each column's bind_processor to do + # most of the work, then do the final construction of the csv rows ourselves + # to control exactly how values are converted and which ones are quoted. + column_bind_processors = { + column.name: column.type.bind_processor(connection.dialect) + for column in columns + } + + # Use copy to run the copy statement. + # https://www.psycopg.org/psycopg3/docs/basic/copy.html + with connection.connection.cursor().copy(copy_statement) as copy: # type: ignore[attr-defined] + for row in data_to_copy: + processed_row = [] + for row_column_name in row: + if column_bind_processors[row_column_name] is not None: + processed_row.append( + column_bind_processors[row_column_name]( + row[row_column_name] + ) + ) + else: + processed_row.append(row[row_column_name]) + + copy.write_row(processed_row) + return True def upsert( diff --git a/target_postgres/tests/test_sdk.py b/target_postgres/tests/test_sdk.py index 34e17da..07ceb49 100644 --- a/target_postgres/tests/test_sdk.py +++ b/target_postgres/tests/test_sdk.py @@ -10,22 +10,18 @@ TargetCamelcaseTest, TargetCliPrintsTest, TargetDuplicateRecords, - TargetEncodedStringData, TargetInvalidSchemaTest, - TargetMultipleStateMessages, TargetNoPrimaryKeys, TargetOptionalAttributes, TargetRecordBeforeSchemaTest, TargetRecordMissingKeyProperty, TargetRecordMissingOptionalFields, - TargetRecordMissingRequiredProperty, TargetSchemaNoProperties, TargetSchemaUpdates, TargetSpecialCharsInAttributes, ) from target_postgres.target import TargetPostgres - from .core import create_engine, postgres_config target_tests = TestSuite( diff --git a/target_postgres/tests/test_target_postgres.py b/target_postgres/tests/test_target_postgres.py index 8e4a79e..ebeccce 100644 --- a/target_postgres/tests/test_target_postgres.py +++ b/target_postgres/tests/test_target_postgres.py @@ -158,7 +158,7 @@ def test_sqlalchemy_url_config(postgres_config_no_ssl): port = postgres_config_no_ssl["port"] config = { - "sqlalchemy_url": f"postgresql://{user}:{password}@{host}:{port}/{database}" + "sqlalchemy_url": f"postgresql+psycopg://{user}:{password}@{host}:{port}/{database}" } tap = SampleTapCountries(config={}, state=None) target = TargetPostgres(config=config)