Skip to content

Commit

Permalink
feat: Handle Pre-filtering of tables (#811)
Browse files Browse the repository at this point in the history
### Feature or Bugfix

- Feature

### Detail
- For a dataset to make sense all the tables within a dataset should
have their location pointing to the same place as the dataset S3 bucket.
However it is possible that a database can have tables which do not
point to the same bucket which is perfectly legal in LakeFormation.
Therefore we propose that data.all automatically only lists tables that
have the same S3 bucket location as the dataset. This will solve a
problem for Yahoo where we want to import a database that contains many
tables with different buckets. Additionally Catalog UI should also only
list prefiltered tables.

### Testing
- Tested this in local env. I was able to create and share datasets even
after pre-filtering process takes place.
- Will send separate PR for unit testing. 

### Security
Please answer the questions below briefly where applicable, or write
`N/A`. Based on
[OWASP 10](https://owasp.org/Top10/en/).

- Does this PR introduce or modify any input fields or queries - this
includes
fetching data from storage outside the application (e.g. a database, an
S3 bucket)?
  - Is the input sanitized?
- What precautions are you taking before deserializing the data you
consume?
  - Is injection prevented by parametrizing queries?
  - Have you ensured no `eval` or similar functions are used?
- Does this PR introduce any functionality or component that requires
authorization?
- How have you ensured it respects the existing AuthN/AuthZ mechanisms?
  - Are you logging failed auth attempts?
- Are you using or adding any cryptographic features?
  - Do you use a standard proven implementations?
  - Are the used keys controlled by the customer? Where are they stored?
- Are you introducing any new policies/roles/users?
  - Have you used the least-privilege principle? How?


By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.

Co-authored-by: Anushka Singh <[email protected]>
  • Loading branch information
anushka-singh and Anushka Singh authored Oct 18, 2023
1 parent 66b9a08 commit c833c26
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 14 deletions.
4 changes: 3 additions & 1 deletion backend/dataall/modules/datasets/api/table/mutations.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@

syncTables = gql.MutationField(
name='syncTables',
args=[gql.Argument(name='datasetUri', type=gql.NonNullableType(gql.String))],
args=[
gql.Argument(name='datasetUri', type=gql.NonNullableType(gql.String))
],
type=gql.Ref('DatasetTableSearchResult'),
resolver=sync_tables,
)
35 changes: 25 additions & 10 deletions backend/dataall/modules/datasets/aws/glue_dataset_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def update_crawler(self, targets):
else:
raise e

def list_glue_database_tables(self):
def list_glue_database_tables(self, dataset_s3_bucket_name):
dataset = self._dataset
database = dataset.GlueDatabaseName
account_id = dataset.AwsAccountId
Expand All @@ -76,15 +76,14 @@ def list_glue_database_tables(self):
if not self.database_exists():
return found_tables

paginator = self._client.get_paginator('get_tables')

pages = paginator.paginate(
DatabaseName=database,
CatalogId=account_id,
)
for page in pages:
found_tables.extend(page['TableList'])

pages = self.get_pages(database, account_id)
dataset_s3_bucket = f"s3://{dataset_s3_bucket_name}/"
found_tables = [
table
for page in pages
for table in page['TableList']
if table.get('StorageDescriptor', {}).get('Location', '').startswith(dataset_s3_bucket)
]
log.debug(f'Retrieved all database {database} tables: {found_tables}')

except ClientError as e:
Expand All @@ -102,3 +101,19 @@ def database_exists(self):
except ClientError:
log.info(f'Database {dataset.GlueDatabaseName} does not exist on account {dataset.AwsAccountId}...')
return False

def get_pages(self, database, account_id):
pages = []
try:
paginator = self._client.get_paginator('get_tables')

pages = paginator.paginate(
DatabaseName=database,
CatalogId=account_id,
)
except ClientError as e:
log.error(
f'Failed to retrieve pages for database {account_id}|{database}: {e}',
exc_info=True,
)
return pages
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ def sync_tables_for_dataset(cls, uri):
context = get_context()
with context.db_engine.scoped_session() as session:
dataset = DatasetRepository.get_dataset_by_uri(session, uri)

tables = DatasetCrawler(dataset).list_glue_database_tables()
S3Prefix = dataset.S3BucketName
tables = DatasetCrawler(dataset).list_glue_database_tables(S3Prefix)
cls.sync_existing_tables(session, dataset.datasetUri, glue_tables=tables)
DatasetTableIndexer.upsert_all(
session=session, dataset_uri=dataset.datasetUri
Expand Down
2 changes: 1 addition & 1 deletion backend/dataall/modules/datasets/tasks/tables_syncer.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def sync_tables(engine):
)
else:

tables = DatasetCrawler(dataset).list_glue_database_tables()
tables = DatasetCrawler(dataset).list_glue_database_tables(dataset.S3BucketName)

log.info(
f'Found {len(tables)} tables on Glue database {dataset.GlueDatabaseName}'
Expand Down

0 comments on commit c833c26

Please sign in to comment.