Skip to content

Commit

Permalink
bugfix for gh-1734
Browse files Browse the repository at this point in the history
  • Loading branch information
TejasRGitHub authored and trajopadhye committed Dec 12, 2024
1 parent 64da618 commit 7adc3a6
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
import logging
from datetime import datetime
from typing import List

from sqlalchemy.sql import and_

from dataall.base.db import exceptions
from dataall.core.activity.db.activity_models import Activity
from dataall.modules.s3_datasets.db.dataset_models import (
DatasetTableColumn,
DatasetTable,
S3Dataset,
DatasetTableDataFilter,
)
from dataall.base.utils import json_utils
from dataall.modules.shares_base.db.share_object_repositories import ShareObjectRepository
from dataall.modules.shares_base.db.share_object_models import ShareObjectItem
from dataall.modules.shares_base.services.shares_enums import ShareItemStatus

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -61,11 +66,33 @@ def get_dataset_table_by_uri(session, table_uri):
return table

@staticmethod
def update_existing_tables_status(existing_tables, glue_tables):
def update_existing_tables_status(existing_tables, glue_tables, session):
for existing_table in existing_tables:
if existing_table.GlueTableName not in [t['Name'] for t in glue_tables]:
existing_table.LastGlueTableStatus = 'Deleted'
logger.info(f'Existing Table {existing_table.GlueTableName} status set to Deleted from Glue')
# Once the table item is deleted from glue and no longer part of the dataset
# Find out where this item is used in shares and delete all the share items.
share_item_status_filter = [ShareItemStatus.Share_Succeeded.value]
share_object_items: List[ShareObjectItem] = (
ShareObjectRepository.list_share_object_items_for_item_with_status(
session=session, item_uri=existing_table.tableUri, status=share_item_status_filter
)
)
logger.info(
f'Found {len(share_object_items)} share objects where the table {existing_table.tableUri} is present as a share item in state: {share_item_status_filter}. Deleting those share items'
)
for share_object_item in share_object_items:
activity = Activity(
action='SHARE_OBJECT_ITEM:DELETE',
label='SHARE_OBJECT_ITEM:DELETE',
owner='dataall-automation',
summary=f'dataall-automation deleted share object: {share_object_item.itemName} with uri: {share_object_item.itemUri} since the glue table associated was deleted from source glue db',
targetUri=share_object_item.itemUri,
targetType='share_object_item',
)
session.add(activity)
session.delete(share_object_item)
elif (
existing_table.GlueTableName in [t['Name'] for t in glue_tables]
and existing_table.LastGlueTableStatus == 'Deleted'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,7 @@ def sync_existing_tables(session, uri, glue_tables=None):
existing_tables = DatasetTableRepository.find_dataset_tables(session, uri)
existing_table_names = [e.GlueTableName for e in existing_tables]
existing_dataset_tables_map = {t.GlueTableName: t for t in existing_tables}

DatasetTableRepository.update_existing_tables_status(existing_tables, glue_tables)
DatasetTableRepository.update_existing_tables_status(existing_tables, glue_tables, session)
log.info(f'existing_tables={glue_tables}')
for table in glue_tables:
if table['Name'] not in existing_table_names:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@
from dataall.modules.datasets_base.db.dataset_models import DatasetBase
from dataall.modules.datasets_base.db.dataset_repositories import DatasetBaseRepository
from dataall.modules.notifications.db.notification_models import Notification
from dataall.modules.s3_datasets.db.dataset_models import S3Dataset
from dataall.modules.shares_base.db.share_object_models import ShareObjectItem, ShareObject
from dataall.modules.shares_base.services.shares_enums import (
ShareItemHealthStatus,
PrincipalType,
)
from dataall.modules.shares_base.services.shares_enums import ShareItemHealthStatus, PrincipalType, ShareableType

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -108,6 +106,14 @@ def get_share_item_by_uri(session, uri):
def get_share_item_details(session, share_type_model, item_uri):
return session.query(share_type_model).get(item_uri)

@staticmethod
def get_shares_for_principal_and_database(session, principal, database):
return (
session.query(ShareObject)
.join(S3Dataset, S3Dataset.datasetUri == ShareObject.datasetUri)
.filter(and_(S3Dataset.GlueDatabaseName == database, ShareObject.principalIAMRoleName == principal))
)

@staticmethod
def remove_share_object_item(session, share_item):
session.delete(share_item)
Expand Down Expand Up @@ -411,6 +417,8 @@ def list_shareable_items_of_type(session, share, type, share_type_model, share_t
)
if status:
query = query.filter(ShareObjectItem.status.in_(status))
if type == ShareableType.Table:
query = query.filter(share_type_model.LastGlueTableStatus == 'InSync')
return query

@staticmethod
Expand Down Expand Up @@ -455,6 +463,14 @@ def list_active_share_object_for_dataset(session, dataset_uri):
)
return share_objects

@staticmethod
def list_share_object_items_for_item_with_status(session, item_uri: str, status: List[str]):
return (
session.query(ShareObjectItem)
.filter(ShareObjectItem.status.in_(status), ShareObjectItem.itemUri == item_uri)
.all()
)

@staticmethod
def fetch_submitted_shares_with_notifications(session):
"""
Expand Down

0 comments on commit 7adc3a6

Please sign in to comment.