Skip to content

Commit

Permalink
Enabling S3 bucket share (#848)
Browse files Browse the repository at this point in the history
### Feature or Bugfix
- Feature


### Detail
- We want to enable bucket sharing along with access point share which
already exists in data all right now.
- A user will be able to request shares at bucket level and at the
folder level with access points.
- Please NOTE: There is some common code between Access point share
managers and processors and S3 Bucket managers and processors. We will
send out a separate PR for that refactoring work at a later time.


### Relates
- #284
- #823
-
https://github.com/awslabs/aws-dataall/pull/846/files#diff-c1f522a1f50d8bcf7b6e5b2e586e40a8de784caa80345f4e05a6329ae2a372d0

### Contributors:
- Contents of this PR have been contributed by @anushka-singh,
@blitzmohit, @rbernotas, @TejasRGitHub

### Security
Please answer the questions below briefly where applicable, or write
`N/A`. Based on
[OWASP 10](https://owasp.org/Top10/en/).

- Does this PR introduce or modify any input fields or queries - this
includes
fetching data from storage outside the application (e.g. a database, an
S3 bucket)?
  - Is the input sanitized?
- What precautions are you taking before deserializing the data you
consume?
  - Is injection prevented by parametrizing queries?
  - Have you ensured no `eval` or similar functions are used?
- Does this PR introduce any functionality or component that requires
authorization?
- How have you ensured it respects the existing AuthN/AuthZ mechanisms?
  - Are you logging failed auth attempts?
- Are you using or adding any cryptographic features?
  - Do you use a standard proven implementations?
  - Are the used keys controlled by the customer? Where are they stored?
- Are you introducing any new policies/roles/users?
  - Have you used the least-privilege principle? How?


By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.

---------

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: Noah Paige <[email protected]>
Co-authored-by: dlpzx <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: jaidisido <[email protected]>
Co-authored-by: dlpzx <[email protected]>
Co-authored-by: mourya-33 <[email protected]>
Co-authored-by: nikpodsh <[email protected]>
Co-authored-by: MK <[email protected]>
Co-authored-by: Manjula <[email protected]>
Co-authored-by: Zilvinas Saltys <[email protected]>
Co-authored-by: Zilvinas Saltys <[email protected]>
Co-authored-by: Daniel Lorch <[email protected]>
Co-authored-by: Anushka Singh <[email protected]>
Co-authored-by: Tejas Rajopadhye <[email protected]>
Co-authored-by: trajopadhye <[email protected]>
  • Loading branch information
16 people authored Nov 20, 2023
1 parent 5dd451f commit a9bc139
Show file tree
Hide file tree
Showing 29 changed files with 3,104 additions and 119 deletions.
1 change: 1 addition & 0 deletions backend/dataall/modules/dataset_sharing/api/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ class ShareableType(GraphQLEnumMapper):
Table = 'DatasetTable'
StorageLocation = 'DatasetStorageLocation'
View = 'View'
S3Bucket = 'S3Bucket'


class ShareObjectPermission(GraphQLEnumMapper):
Expand Down
1 change: 1 addition & 0 deletions backend/dataall/modules/dataset_sharing/api/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ def resolve_consumption_data(context: Context, source: ShareObject, **kwargs):
return {
's3AccessPointName': S3AccessPointName,
'sharedGlueDatabase': (ds.GlueDatabaseName + '_shared_' + source.shareUri)[:254] if ds else 'Not created',
's3bucketName': ds.S3BucketName,
}


Expand Down
1 change: 1 addition & 0 deletions backend/dataall/modules/dataset_sharing/api/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
fields=[
gql.Field(name='s3AccessPointName', type=gql.String),
gql.Field(name='sharedGlueDatabase', type=gql.String),
gql.Field(name='s3bucketName', type=gql.String),
],
)

Expand Down
45 changes: 45 additions & 0 deletions backend/dataall/modules/dataset_sharing/aws/s3_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,51 @@ def generate_access_point_policy_template(
}
return policy

@staticmethod
def generate_default_bucket_policy(
s3_bucket_name: str,
owner_roleId: list,
allow_owner_sid: str,
):
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": allow_owner_sid,
"Effect": "Allow",
"Principal": "*",
"Action": "s3:*",
"Resource": [
f"arn:aws:s3:::{s3_bucket_name}",
f"arn:aws:s3:::{s3_bucket_name}/*"
],
"Condition": {
"StringLike": {
"aws:userId": owner_roleId
}
}
},
{
"Effect": "Deny",
"Principal": {
"AWS": "*"
},
"Sid": "RequiredSecureTransport",
"Action": "s3:*",
"Resource": [
f"arn:aws:s3:::{s3_bucket_name}",
f"arn:aws:s3:::{s3_bucket_name}/*"
],
"Condition": {
"Bool": {
"aws:SecureTransport": "false"
}
}
}
]
}
return policy


class S3Client:
def __init__(self, account_id, region):
Expand Down
1 change: 1 addition & 0 deletions backend/dataall/modules/dataset_sharing/db/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class ShareableType(Enum):
Table = 'DatasetTable'
StorageLocation = 'DatasetStorageLocation'
View = 'View'
S3Bucket = 'S3Bucket'


class PrincipalType(Enum):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
ShareItemStatus, ShareableType, PrincipalType
from dataall.modules.dataset_sharing.db.share_object_models import ShareObjectItem, ShareObject
from dataall.modules.datasets_base.db.dataset_repositories import DatasetRepository
from dataall.modules.datasets_base.db.dataset_models import DatasetStorageLocation, DatasetTable, Dataset
from dataall.modules.datasets_base.db.dataset_models import DatasetStorageLocation, DatasetTable, Dataset, DatasetBucket

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -356,6 +356,8 @@ def get_share_item(session, item_type, item_uri):
return session.query(DatasetTable).get(item_uri)
if item_type == ShareableType.StorageLocation.value:
return session.query(DatasetStorageLocation).get(item_uri)
if item_type == ShareableType.S3Bucket.value:
return session.query(DatasetBucket).get(item_uri)

@staticmethod
def get_share_by_uri(session, uri):
Expand Down Expand Up @@ -525,7 +527,33 @@ def list_shareable_items(session, share, states, data):
if states:
locations = locations.filter(ShareObjectItem.status.in_(states))

shareable_objects = tables.union(locations).subquery('shareable_objects')
s3_buckets = (
session.query(
DatasetBucket.bucketUri.label('itemUri'),
func.coalesce('S3Bucket').label('itemType'),
DatasetBucket.S3BucketName.label('itemName'),
DatasetBucket.description.label('description'),
ShareObjectItem.shareItemUri.label('shareItemUri'),
ShareObjectItem.status.label('status'),
case(
[(ShareObjectItem.shareItemUri.isnot(None), True)],
else_=False,
).label('isShared'),
)
.outerjoin(
ShareObjectItem,
and_(
ShareObjectItem.shareUri == share.shareUri,
DatasetBucket.bucketUri
== ShareObjectItem.itemUri,
),
)
.filter(DatasetBucket.datasetUri == share.datasetUri)
)
if states:
s3_buckets = s3_buckets.filter(ShareObjectItem.status.in_(states))

shareable_objects = tables.union(locations, s3_buckets).subquery('shareable_objects')
query = session.query(shareable_objects)

if data:
Expand Down Expand Up @@ -732,9 +760,14 @@ def get_share_data_items(session, share_uri, status):
session, share, status, DatasetStorageLocation, DatasetStorageLocation.locationUri
)

s3_buckets = ShareObjectRepository._find_all_share_item(
session, share, status, DatasetBucket, DatasetBucket.bucketUri
)

return (
tables,
folders,
s3_buckets,
)

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import logging

from dataall.modules.dataset_sharing.services.share_processors.lf_process_cross_account_share import ProcessLFCrossAccountShare
from dataall.modules.dataset_sharing.services.share_processors.lf_process_same_account_share import ProcessLFSameAccountShare
from dataall.modules.dataset_sharing.services.share_processors.s3_process_share import ProcessS3Share
from dataall.modules.dataset_sharing.services.share_processors.lf_process_cross_account_share import \
ProcessLFCrossAccountShare
from dataall.modules.dataset_sharing.services.share_processors.lf_process_same_account_share import \
ProcessLFSameAccountShare
from dataall.modules.dataset_sharing.services.share_processors.s3_access_point_process_share import \
ProcessS3AccessPointShare
from dataall.modules.dataset_sharing.services.share_processors.s3_bucket_process_share import ProcessS3BucketShare

from dataall.base.db import Engine
from dataall.modules.dataset_sharing.db.enums import ShareObjectActions, ShareItemStatus, ShareableType
from dataall.modules.dataset_sharing.db.share_object_repositories import ShareObjectSM, ShareObjectRepository, ShareItemSM
from dataall.modules.dataset_sharing.db.enums import (ShareObjectActions, ShareItemStatus, ShareableType,
ShareItemActions)
from dataall.modules.dataset_sharing.db.share_object_repositories import ShareObjectSM, ShareObjectRepository, \
ShareItemSM

log = logging.getLogger(__name__)

Expand All @@ -21,8 +27,9 @@ def approve_share(cls, engine: Engine, share_uri: str) -> bool:
1) Updates share object State Machine with the Action: Start
2) Retrieves share data and items in Share_Approved state
3) Calls sharing folders processor to grant share
4) Calls sharing tables processor for same or cross account sharing to grant share
5) Updates share object State Machine with the Action: Finish
4) Calls sharing buckets processor to grant share
5) Calls sharing tables processor for same or cross account sharing to grant share
6) Updates share object State Machine with the Action: Finish
Parameters
----------
Expand Down Expand Up @@ -50,12 +57,13 @@ def approve_share(cls, engine: Engine, share_uri: str) -> bool:

(
shared_tables,
shared_folders
shared_folders,
shared_buckets
) = ShareObjectRepository.get_share_data_items(session, share_uri, ShareItemStatus.Share_Approved.value)

log.info(f'Granting permissions to folders: {shared_folders}')

approved_folders_succeed = ProcessS3Share.process_approved_shares(
approved_folders_succeed = ProcessS3AccessPointShare.process_approved_shares(
session,
dataset,
share,
Expand All @@ -67,6 +75,20 @@ def approve_share(cls, engine: Engine, share_uri: str) -> bool:
)
log.info(f'sharing folders succeeded = {approved_folders_succeed}')

log.info('Granting permissions to S3 buckets')

approved_s3_buckets_succeed = ProcessS3BucketShare.process_approved_shares(
session,
dataset,
share,
shared_buckets,
source_environment,
target_environment,
source_env_group,
env_group
)
log.info(f'sharing s3 buckets succeeded = {approved_s3_buckets_succeed}')

if source_environment.AwsAccountId != target_environment.AwsAccountId:
processor = ProcessLFCrossAccountShare(
session,
Expand Down Expand Up @@ -97,7 +119,7 @@ def approve_share(cls, engine: Engine, share_uri: str) -> bool:
new_share_state = share_sm.run_transition(ShareObjectActions.Finish.value)
share_sm.update_state(session, share, new_share_state)

return approved_tables_succeed if approved_folders_succeed else False
return approved_folders_succeed and approved_s3_buckets_succeed and approved_tables_succeed

@classmethod
def revoke_share(cls, engine: Engine, share_uri: str):
Expand All @@ -108,7 +130,8 @@ def revoke_share(cls, engine: Engine, share_uri: str):
4) Checks if remaining folders are shared and effectuates clean up with folders processor
5) Calls sharing tables processor for same or cross account sharing to revoke share
6) Checks if remaining tables are shared and effectuates clean up with tables processor
7) Updates share object State Machine with the Action: Finish
7) Calls sharing buckets processor to revoke share
8) Updates share object State Machine with the Action: Finish
Parameters
----------
Expand Down Expand Up @@ -139,15 +162,16 @@ def revoke_share(cls, engine: Engine, share_uri: str):

(
revoked_tables,
revoked_folders
revoked_folders,
revoked_buckets
) = ShareObjectRepository.get_share_data_items(session, share_uri, ShareItemStatus.Revoke_Approved.value)

new_state = revoked_item_sm.run_transition(ShareObjectActions.Start.value)
revoked_item_sm.update_state(session, share_uri, new_state)

log.info(f'Revoking permissions to folders: {revoked_folders}')

revoked_folders_succeed = ProcessS3Share.process_revoked_shares(
revoked_folders_succeed = ProcessS3AccessPointShare.process_revoked_shares(
session,
dataset,
share,
Expand All @@ -158,21 +182,48 @@ def revoke_share(cls, engine: Engine, share_uri: str):
env_group,
)
log.info(f'revoking folders succeeded = {revoked_folders_succeed}')
existing_shared_items = ShareObjectRepository.check_existing_shared_items_of_type(
existing_shared_folders = ShareObjectRepository.check_existing_shared_items_of_type(
session,
share_uri,
ShareableType.StorageLocation.value
)
existing_shared_buckets = ShareObjectRepository.check_existing_shared_items_of_type(
session,
share_uri,
ShareableType.S3Bucket.value
)
existing_shared_items = existing_shared_folders or existing_shared_buckets
log.info(f'Still remaining S3 resources shared = {existing_shared_items}')
if not existing_shared_items and revoked_folders:
if not existing_shared_folders and revoked_folders:
log.info("Clean up S3 access points...")
clean_up_folders = ProcessS3Share.clean_up_share(
clean_up_folders = ProcessS3AccessPointShare.clean_up_share(
session,
dataset=dataset,
share=share,
target_environment=target_environment
folder=revoked_folders[0],
source_environment=source_environment,
target_environment=target_environment,
source_env_group=source_env_group,
env_group=env_group,
existing_shared_buckets=existing_shared_buckets
)
log.info(f"Clean up S3 successful = {clean_up_folders}")

log.info('Revoking permissions to S3 buckets')

revoked_s3_buckets_succeed = ProcessS3BucketShare.process_revoked_shares(
session,
dataset,
share,
revoked_buckets,
source_environment,
target_environment,
source_env_group,
env_group,
existing_shared_folders
)
log.info(f'revoking s3 buckets succeeded = {revoked_s3_buckets_succeed}')

if source_environment.AwsAccountId != target_environment.AwsAccountId:
processor = ProcessLFCrossAccountShare(
session,
Expand Down Expand Up @@ -217,4 +268,4 @@ def revoke_share(cls, engine: Engine, share_uri: str):
new_share_state = share_sm.run_transition(ShareObjectActions.Finish.value)
share_sm.update_state(session, share, new_share_state)

return revoked_tables_succeed and revoked_folders_succeed
return revoked_folders_succeed and revoked_s3_buckets_succeed and revoked_tables_succeed
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from dataall.core.environment.db.environment_models import Environment
from dataall.modules.dataset_sharing.db.share_object_models import ShareObject
from dataall.modules.datasets_base.db.dataset_models import DatasetTable, Dataset, DatasetStorageLocation
from dataall.modules.datasets_base.db.dataset_models import DatasetTable, Dataset, DatasetStorageLocation, DatasetBucket
from dataall.base.utils.alarm_service import AlarmService

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -147,5 +147,48 @@ def trigger_revoke_folder_sharing_failure_alarm(
Share Target
- AWS Account: {target_environment.AwsAccountId}
- Region: {target_environment.region}
"""
return self.publish_message_to_alarms_topic(subject, message)

def trigger_s3_bucket_sharing_failure_alarm(
self,
bucket: DatasetBucket,
share: ShareObject,
target_environment: Environment,
):
alarm_type = "Share"
return self.handle_bucket_sharing_failure(bucket, share, target_environment, alarm_type)

def trigger_revoke_s3_bucket_sharing_failure_alarm(
self,
bucket: DatasetBucket,
share: ShareObject,
target_environment: Environment,
):
alarm_type = "Sharing Revoke"
return self.handle_bucket_sharing_failure(bucket, share, target_environment, alarm_type)

def handle_bucket_sharing_failure(self, bucket: DatasetBucket,
share: ShareObject,
target_environment: Environment,
alarm_type: str):
log.info(f'Triggering {alarm_type} failure alarm...')
subject = (
f'ALARM: DATAALL S3 Bucket {bucket.S3BucketName} {alarm_type} Failure Notification'
)
message = f"""
You are receiving this email because your DATAALL {self.envname} environment in the {self.region} region has entered the ALARM state, because it failed to {alarm_type} the S3 Bucket {bucket.S3BucketName}.
Alarm Details:
- State Change: OK -> ALARM
- Reason for State Change: S3 Bucket {alarm_type} failure
- Timestamp: {datetime.now()}
Share Source
- Dataset URI: {share.datasetUri}
- AWS Account: {bucket.AwsAccountId}
- Region: {bucket.region}
- S3 Bucket: {bucket.S3BucketName}
Share Target
- AWS Account: {target_environment.AwsAccountId}
- Region: {target_environment.region}
"""
return self.publish_message_to_alarms_topic(subject, message)
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .s3_share_manager import S3ShareManager
from .s3_access_point_share_manager import S3AccessPointShareManager
from .lf_share_manager import LFShareManager
from .s3_bucket_share_manager import S3BucketShareManager
Loading

0 comments on commit a9bc139

Please sign in to comment.