diff --git a/pyproject.toml b/pyproject.toml index 2f11781c..b586f22a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -97,6 +97,8 @@ select = [ "RET", # flake8-return "SIM", # flake8-simplify "TCH", # flake8-type-checking + "ERA", # eradicate + "PGH", # pygrep-hooks "PL", # Pylint "PERF", # Perflint "RUF", # ruff diff --git a/target_postgres/connector.py b/target_postgres/connector.py index 6ba09b9e..7a3fa3bf 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -183,10 +183,12 @@ def copy_table_structure( raise RuntimeError("Table already exists") columns = [column._copy() for column in from_table.columns] + if as_temp_table: new_table = sa.Table(table_name, meta, *columns, prefixes=["TEMPORARY"]) new_table.create(bind=connection) return new_table + new_table = sa.Table(table_name, meta, *columns) new_table.create(bind=connection) return new_table @@ -200,7 +202,7 @@ def drop_table(self, table: sa.Table, connection: sa.engine.Connection): """Drop table data.""" table.drop(bind=connection) - def clone_table( + def clone_table( # noqa: PLR0913 self, new_table_name, table, metadata, connection, temp_table ) -> sa.Table: """Clone a table.""" @@ -321,6 +323,7 @@ def pick_individual_type(self, jsonschema_type: dict): # noqa: PLR0911 ): return HexByteString() individual_type = th.to_sql_type(jsonschema_type) + return TEXT() if isinstance(individual_type, VARCHAR) else individual_type @staticmethod @@ -412,7 +415,7 @@ def create_empty_table( # type: ignore[override] # noqa: PLR0913 new_table.create(bind=connection) return new_table - def prepare_column( + def prepare_column( # noqa: PLR0913 self, full_table_name: str | FullyQualifiedName, column_name: str, @@ -460,7 +463,7 @@ def prepare_column( column_object=column_object, ) - def _create_empty_column( # type: ignore[override] + def _create_empty_column( # type: ignore[override] # noqa: PLR0913 self, schema_name: str, table_name: str, diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index f52d6c78..fd3cbc6f 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -112,14 +112,14 @@ def process_batch(self, context: dict) -> None: def generate_temp_table_name(self): """Uuid temp table name.""" - # sa.exc.IdentifierError: Identifier + # sa.exc.IdentifierError: Identifier # noqa: ERA001 # 'temp_test_optional_attributes_388470e9_fbd0_47b7_a52f_d32a2ee3f5f6' # exceeds maximum length of 63 characters # Is hit if we have a long table name, there is no limit on Temporary tables # in postgres, used a guid just in case we are using the same session return f"{str(uuid.uuid4()).replace('-', '_')}" - def bulk_insert_records( # type: ignore[override] + def bulk_insert_records( # type: ignore[override] # noqa: PLR0913 self, table: sa.Table, schema: dict, @@ -176,7 +176,7 @@ def bulk_insert_records( # type: ignore[override] connection.execute(insert, data_to_insert) return True - def upsert( + def upsert( # noqa: PLR0913 self, from_table: sa.Table, to_table: sa.Table, @@ -293,7 +293,7 @@ def schema_name(self) -> str | None: Returns: The target schema name. """ - # Look for a default_target_scheme in the configuration fle + # Look for a default_target_scheme in the configuration file default_target_schema: str = self.config.get("default_target_schema", None) parts = self.stream_name.split("-") diff --git a/target_postgres/target.py b/target_postgres/target.py index 2cc0b714..19e6fa21 100644 --- a/target_postgres/target.py +++ b/target_postgres/target.py @@ -178,7 +178,7 @@ def __init__( th.BooleanType, default=False, description=( - "When activate version is sent from a tap this specefies " + "When activate version is sent from a tap this specifies " + "if we should delete the records that don't match, or mark " + "them with a date in the `_sdc_deleted_at` column. This config " + "option is ignored if `activate_version` is set to false." diff --git a/target_postgres/tests/test_target_postgres.py b/target_postgres/tests/test_target_postgres.py index 7f474005..65b06611 100644 --- a/target_postgres/tests/test_target_postgres.py +++ b/target_postgres/tests/test_target_postgres.py @@ -78,11 +78,7 @@ def singer_file_to_target(file_name, target) -> None: def remove_metadata_columns(row: dict) -> dict: - new_row = {} - for column in row.keys(): - if not column.startswith("_sdc"): - new_row[column] = row[column] - return new_row + return {column: row[column] for column in row if not column.startswith("_sdc")} def verify_data( @@ -106,43 +102,43 @@ def verify_data( engine = create_engine(target) full_table_name = f"{target.config['default_target_schema']}.{table_name}" with engine.connect() as connection: - if primary_key is not None and check_data is not None: - if isinstance(check_data, dict): - result = connection.execute( - sqlalchemy.text( - f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" - ) - ) - assert result.rowcount == number_of_rows - result_dict = remove_metadata_columns(result.first()._asdict()) - assert result_dict == check_data - elif isinstance(check_data, list): - result = connection.execute( - sqlalchemy.text( - f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" - ) - ) - assert result.rowcount == number_of_rows - result_dict = [ - remove_metadata_columns(row._asdict()) for row in result.all() - ] - - # bytea columns are returned as memoryview objects - # we need to convert them to bytes to allow comparison with check_data - for row in result_dict: - for col in row: - if isinstance(row[col], memoryview): - row[col] = bytes(row[col]) - - assert result_dict == check_data - else: - raise ValueError("Invalid check_data - not dict or list of dicts") - else: + if primary_key is None or check_data is None: result = connection.execute( sqlalchemy.text(f"SELECT COUNT(*) FROM {full_table_name}") ) assert result.first()[0] == number_of_rows + elif isinstance(check_data, dict): + result = connection.execute( + sqlalchemy.text( + f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" + ) + ) + assert result.rowcount == number_of_rows + result_dict = remove_metadata_columns(result.first()._asdict()) + assert result_dict == check_data + elif isinstance(check_data, list): + result = connection.execute( + sqlalchemy.text( + f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" + ) + ) + assert result.rowcount == number_of_rows + result_dict = [ + remove_metadata_columns(row) for row in result.mappings().all() + ] + + # bytea columns are returned as memoryview objects + # we need to convert them to bytes to allow comparison with check_data + for row in result_dict: + for col in row: + if isinstance(row[col], memoryview): + row[col] = bytes(row[col]) + + assert result_dict == check_data + else: + raise ValueError("Invalid check_data - not dict or list of dicts") + def test_sqlalchemy_url_config(postgres_config_no_ssl): """Be sure that passing a sqlalchemy_url works @@ -439,7 +435,7 @@ def test_encoded_string_data(postgres_target): https://www.postgresql.org/docs/current/functions-string.html#:~:text=chr(0)%20is%20disallowed%20because%20text%20data%20types%20cannot%20store%20that%20character. chr(0) is disallowed because text data types cannot store that character. - Note you will recieve a ValueError: A string literal cannot contain NUL (0x00) characters. Which seems like a reasonable error. + Note you will receive a ValueError: A string literal cannot contain NUL (0x00) characters. Which seems like a reasonable error. See issue https://github.com/MeltanoLabs/target-postgres/issues/60 for more details. """ @@ -499,18 +495,11 @@ def test_anyof(postgres_target): # Any of nullable array of strings or single string. # {"anyOf":[{"type":"array","items":{"type":["null","string"]}},{"type":"string"},{"type":"null"}]} - if column.name == "parent_ids": - assert isinstance(column.type, ARRAY) - - # Any of nullable string. - # {"anyOf":[{"type":"string"},{"type":"null"}]} - if column.name == "commit_message": + if column.name in ["commit_message", "legacy_id"]: assert isinstance(column.type, TEXT) - # Any of nullable string or integer. - # {"anyOf":[{"type":"string"},{"type":"integer"},{"type":"null"}]} - if column.name == "legacy_id": - assert isinstance(column.type, TEXT) + elif column.name == "parent_ids": + assert isinstance(column.type, ARRAY) def test_new_array_column(postgres_target): @@ -747,7 +736,7 @@ def test_activate_version_deletes_data_properly(postgres_target): def test_reserved_keywords(postgres_target): """Target should work regardless of column names - Postgres has a number of resereved keywords listed here https://www.postgresql.org/docs/current/sql-keywords-appendix.html. + Postgres has a number of reserved keywords listed here https://www.postgresql.org/docs/current/sql-keywords-appendix.html. """ file_name = "reserved_keywords.singer" singer_file_to_target(file_name, postgres_target)