From c833c26d8b58c807c4eedfd27f54bffa7152ac72 Mon Sep 17 00:00:00 2001 From: Anushka Singh Date: Wed, 18 Oct 2023 09:07:53 -0400 Subject: [PATCH] feat: Handle Pre-filtering of tables (#811) ### Feature or Bugfix - Feature ### Detail - For a dataset to make sense all the tables within a dataset should have their location pointing to the same place as the dataset S3 bucket. However it is possible that a database can have tables which do not point to the same bucket which is perfectly legal in LakeFormation. Therefore we propose that data.all automatically only lists tables that have the same S3 bucket location as the dataset. This will solve a problem for Yahoo where we want to import a database that contains many tables with different buckets. Additionally Catalog UI should also only list prefiltered tables. ### Testing - Tested this in local env. I was able to create and share datasets even after pre-filtering process takes place. - Will send separate PR for unit testing. ### Security Please answer the questions below briefly where applicable, or write `N/A`. Based on [OWASP 10](https://owasp.org/Top10/en/). - Does this PR introduce or modify any input fields or queries - this includes fetching data from storage outside the application (e.g. a database, an S3 bucket)? - Is the input sanitized? - What precautions are you taking before deserializing the data you consume? - Is injection prevented by parametrizing queries? - Have you ensured no `eval` or similar functions are used? - Does this PR introduce any functionality or component that requires authorization? - How have you ensured it respects the existing AuthN/AuthZ mechanisms? - Are you logging failed auth attempts? - Are you using or adding any cryptographic features? - Do you use a standard proven implementations? - Are the used keys controlled by the customer? Where are they stored? - Are you introducing any new policies/roles/users? - Have you used the least-privilege principle? How? By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. Co-authored-by: Anushka Singh --- .../modules/datasets/api/table/mutations.py | 4 ++- .../datasets/aws/glue_dataset_client.py | 35 +++++++++++++------ .../services/dataset_table_service.py | 4 +-- .../modules/datasets/tasks/tables_syncer.py | 2 +- 4 files changed, 31 insertions(+), 14 deletions(-) diff --git a/backend/dataall/modules/datasets/api/table/mutations.py b/backend/dataall/modules/datasets/api/table/mutations.py index 245bdc0b4..912b9906d 100644 --- a/backend/dataall/modules/datasets/api/table/mutations.py +++ b/backend/dataall/modules/datasets/api/table/mutations.py @@ -23,7 +23,9 @@ syncTables = gql.MutationField( name='syncTables', - args=[gql.Argument(name='datasetUri', type=gql.NonNullableType(gql.String))], + args=[ + gql.Argument(name='datasetUri', type=gql.NonNullableType(gql.String)) + ], type=gql.Ref('DatasetTableSearchResult'), resolver=sync_tables, ) diff --git a/backend/dataall/modules/datasets/aws/glue_dataset_client.py b/backend/dataall/modules/datasets/aws/glue_dataset_client.py index aecd5b555..82b7f52a1 100644 --- a/backend/dataall/modules/datasets/aws/glue_dataset_client.py +++ b/backend/dataall/modules/datasets/aws/glue_dataset_client.py @@ -65,7 +65,7 @@ def update_crawler(self, targets): else: raise e - def list_glue_database_tables(self): + def list_glue_database_tables(self, dataset_s3_bucket_name): dataset = self._dataset database = dataset.GlueDatabaseName account_id = dataset.AwsAccountId @@ -76,15 +76,14 @@ def list_glue_database_tables(self): if not self.database_exists(): return found_tables - paginator = self._client.get_paginator('get_tables') - - pages = paginator.paginate( - DatabaseName=database, - CatalogId=account_id, - ) - for page in pages: - found_tables.extend(page['TableList']) - + pages = self.get_pages(database, account_id) + dataset_s3_bucket = f"s3://{dataset_s3_bucket_name}/" + found_tables = [ + table + for page in pages + for table in page['TableList'] + if table.get('StorageDescriptor', {}).get('Location', '').startswith(dataset_s3_bucket) + ] log.debug(f'Retrieved all database {database} tables: {found_tables}') except ClientError as e: @@ -102,3 +101,19 @@ def database_exists(self): except ClientError: log.info(f'Database {dataset.GlueDatabaseName} does not exist on account {dataset.AwsAccountId}...') return False + + def get_pages(self, database, account_id): + pages = [] + try: + paginator = self._client.get_paginator('get_tables') + + pages = paginator.paginate( + DatabaseName=database, + CatalogId=account_id, + ) + except ClientError as e: + log.error( + f'Failed to retrieve pages for database {account_id}|{database}: {e}', + exc_info=True, + ) + return pages \ No newline at end of file diff --git a/backend/dataall/modules/datasets/services/dataset_table_service.py b/backend/dataall/modules/datasets/services/dataset_table_service.py index c9be4ed27..130397ee4 100644 --- a/backend/dataall/modules/datasets/services/dataset_table_service.py +++ b/backend/dataall/modules/datasets/services/dataset_table_service.py @@ -121,8 +121,8 @@ def sync_tables_for_dataset(cls, uri): context = get_context() with context.db_engine.scoped_session() as session: dataset = DatasetRepository.get_dataset_by_uri(session, uri) - - tables = DatasetCrawler(dataset).list_glue_database_tables() + S3Prefix = dataset.S3BucketName + tables = DatasetCrawler(dataset).list_glue_database_tables(S3Prefix) cls.sync_existing_tables(session, dataset.datasetUri, glue_tables=tables) DatasetTableIndexer.upsert_all( session=session, dataset_uri=dataset.datasetUri diff --git a/backend/dataall/modules/datasets/tasks/tables_syncer.py b/backend/dataall/modules/datasets/tasks/tables_syncer.py index 5c89d1351..5b888f4aa 100644 --- a/backend/dataall/modules/datasets/tasks/tables_syncer.py +++ b/backend/dataall/modules/datasets/tasks/tables_syncer.py @@ -56,7 +56,7 @@ def sync_tables(engine): ) else: - tables = DatasetCrawler(dataset).list_glue_database_tables() + tables = DatasetCrawler(dataset).list_glue_database_tables(dataset.S3BucketName) log.info( f'Found {len(tables)} tables on Glue database {dataset.GlueDatabaseName}'