diff --git a/tools/python/copy-table/README.md b/tools/python/copy-table/README.md new file mode 100644 index 00000000..8565db87 --- /dev/null +++ b/tools/python/copy-table/README.md @@ -0,0 +1,120 @@ +# Timestream Unload & BatchLoad Automation + +## When to use this automation? + +This automation can be used for migrating Timestream for LiveAnalytics table to new table. Automation is divided into two parts #1 for unloading the data from Timestream using different paritioning choices #2 batchloading the data into Timestream, it also covers S3 copy functionality if unload was run on different account or same account with different region.Data modelling changes can be applied as part of batchload. You can use this automation in following use-cases. + +- Migrating Timestream for LiveAnalytics table to different AWS Organization. +- Migrating Timestream for LiveAnalytics table to different region or different account and need data model changes in destination account/region. If data model changes are not required (and accounts belong to same AWS Organization) try to make use of AWS Backups for Timestream https://docs.aws.amazon.com/aws-backup/latest/devguide/timestream-backup.html and https://docs.aws.amazon.com/aws-backup/latest/devguide/timestream-restore.html. +- Migrating Timestream for LiveAnalytics table to new table with customer defined partition key https://docs.aws.amazon.com/timestream/latest/developerguide/customer-defined-partition-keys.html + + +## Getting started with UNLOAD + +### Key considerations/limits and best practices +- **Recommendation is to have your UNLOAD not exceed 80 GB**, consider to split the process into multiple UNLOADS leveraging on the time start/end parameters (e.g. if you have 1 TB, run 13 UNLOADS) to avoid any interruptions. +- **Queries containing UNLOAD statement can export at most 100 partitions per query.**, hence consider to split the process into multiple UNLOADS leveraging on the time start/end parameters. +- **Maximum file size in a batch load task cannot exceed 5 GB**.Unload files as part of this automation will not exceed that size. +- **Concurrency for queries using the UNLOAD statement is 1 query per second (QPS)**. Exceeding the query rate will result in throttling. +- **Queries containing UNLOAD statement time out after 60 minutes.** +- **CSV Files with headers (column names) are created in S3 as part of Unload script**. This is requirement for Timestream batch load. + +Unload official Documentation: (https://docs.aws.amazon.com/timestream/latest/developerguide/export-unload.html) +Check the following guide to learn more: [Limits for UNLOAD from Timestream for LiveAnalytics](https://docs.aws.amazon.com/timestream/latest/developerguide/export-unload-limits.html) + +### Usage Parameters +- **region** [OPTIONAL]: AWS region of your Timestream table to be unloaded, if not provided the current region of your session will be chosen (e.g.: *us-west-1*) +- **database** [REQUIRED]: Timestream database where the table to be unloaded is located +- **table** [REQUIRED]: Timestream table to be unloaded +- **s3_uri** [OPTIONAL]: S3 Bucket URI to store unload data, if not provided a new S3 bucket will be created with the following name 'timestream-unload--- (e.g.: *s3://timestream-unload-sourcedb-mytable-account_id/unload*) +- **from_time** [OPTIONAL]: Timestamp (extreme included) from which you want to select data to unload (e.g.: *2024-02-26 17:24:38.270000000*) +- **end_time** [OPTIONAL]: Timestamp (extreme excluded) to which you want to select data to unload (e.g.: *2024-03-15 19:26:31.304000000*) +- **partition** [OPTIONAL]: Time partition you want to use (possible values: *day, month, year*) +- **iam_role_bucket_policy** [OPTIONAL]: {Applies for cross account migrations} Grants destination IAM Role access to S3 Bucket (e.g.: *arn:aws:iam::123456789123:role/BatchLoadRole*) + +### Examples + +Example to unload the Timestream table *myTable* in the database *sourcedb* to the folder *unload* in the *timestream-unload-sourcedb-mytable* S3 bucket. +Also, it applies an S3 bucket policy to allow the IAM Role *BatchLoadRole* of account *123456789123* to allow the copy. Does day level partitions. + ```bash +python3 unload.py -region eu-west-1 -s3_uri s3://timestream-unload-sourcedb-mytable/unload -database sourcedb -table myTable -iam_role_bucket_policy arn:aws:iam::123456789123:role/BatchLoadRole -p day +``` + +## Getting started with BATCH LOAD + +### Key considerations and best practices + +- **A table cannot have more than 5 active batch load tasks and an account cannot have more than 10 active batch load tasks. Timestream for LiveAnalytics will throttle new batch load tasks until more resources are available.** batch load script allows only 5 as max limit for batchload threads (table level). + +**Additional details** +- [Batch load prerequisites](https://docs.aws.amazon.com/timestream/latest/developerguide/batch-load-prerequisites.html) +- [Batch load best practices](https://docs.aws.amazon.com/timestream/latest/developerguide/batch-load-best-practices.html) +- [Batchload official documentation](https://docs.aws.amazon.com/timestream/latest/developerguide/batch-load.html) +- [Batchload Quotas](https://docs.aws.amazon.com/timestream/latest/developerguide/ts-limits.html) + +### Usage Parameters + +- **region** [OPTIONAL]: AWS region of your Timestream table for batchload, if not provided the current region of your session will be chosen (e.g.: *us-east-1*) +- **database_name** [OPTIONAL]: Timestream database name for batchload (default: batch_load_test) +- **create_timestream_resource**[OPTIONAL]: Provide this if Timestream database and table have to be created (default: False) +- **table_name** [OPTIONAL]: Timestream table name (default: batch_load_test) +- **partition_key** [OPTIONAL]: Partition key for Timestream table, provide partition_key it if option create_timestream_resource is set(default: None) +- **memory_store_retenion_in_hours** [OPTIONAL]: Memory store retention in **hours** for Timestream table (default: 24) +- **magnetic_store_retention_in_days** [OPTIONAL]: Magnetic store retention in **days** for Timestream table (default: 3655) +- **create_error_logging_bucket** [OPTIONAL]: Provide this option if error logging bucket for batchload has to be created (default: False). +- **create_destination_bucket** [OPTIONAL]: Provide this option if bucket for batchload target has to be created (default: False) +- **copy_s3_bucket** [OPTIONAL]: Provide this option if unload files have to copied from source bucket (default: False) +- **s3_source_bucket_location** [OPTIONAL]: Source S3 bucket, if copy_s3_bucket is set to true (default: None). Example : timestream-unload-sourcedb +- **data_model_file** [REQUIRED]: Data model JSON file location for batchload, [data modelling reference](https://docs.aws.amazon.com/timestream/latest/developerguide/batch-load-data-model-mappings.html) +- **s3_target_bucket** [OPTIONAL]: Target bucket for batchload, if not provided defaults to bucket name: timestream-batchload-{database}-{table}-{account_id}-{region} +- **s3_target_error_bucket** [OPTIONAL]: Target bucket for batchload errors, if not provided defaults to bucket name: timestream-batchload-error-{database}-{table}-{account_id}-{region} +- **source_s3_prefix** [OPTIONAL]: Source bucket prefix if copy_s3_bucket is set true (default: results/) +- **destination_s3_prefix** [OPTIONAL]: Desctination bucket prefix if copy_s3_bucket is set true (default: dest/) +- **sns_topic_arn** [OPTIONAL]: SNS topic ARN for sending any batchload failures (default: None), SNS topic should be in same account and region. Example: arn:aws:sns:us-east-2:123456789012:MyTopic +- **num_of_batchload_threads** [OPTIONAL]: Number of parallel batchloads threads (default: 5 and maximum: 5) +- **multi_part_upload_chunk** [OPTIONAL]: Multi part upload chunk size in bytes, default is 500MB (default: 524288000) + +### Examples + +**With S3 Copy** +Example to execute a batch load to the target Timestream table *myTable* with partition key *city* in the database *targetdb* with *us-west-2* region. +Timestream objects are created by this script as per *create_timestream_resource* parameter. +Source data are located in the S3 bucket *timestream-unload-sourcedb-mytable* with prefix *unload/results/*. +S3 batch target and error buckets(for error logs) are created by this script as per *create_destination_bucket* and *create_error_logging_bucket* parameter. +Target bucket and error bucket names are given by *s3_target_bucket* and *s3_target_error_bucket* parameter. Error logs are stored into S3 bucket *timestream-batchload-error-logs*. +Destination prefix will be created with prefix dest/ given by *destination_s3_prefix*. Desired data model file is chosen as *data_model_sample.json* in the current location of the script. + + ```bash +python3 batch_load.py --region us-west-2 --create_timestream_resource --database=targetdb --table=myTable --partition_key city --copy_s3_bucket --s3_source_bucket_location timestream-unload-sourcedb-mytable --source_s3_prefix unload/results/ --create_destination_bucket --s3_target_bucket timestream-batchload-targetdb-mytable --destination_s3_prefix dest/ --create_error_logging_bucket --s3_target_error_bucket timestream-batchload-error-logs --data_model_file "data_model_sample.json" +``` + +**Without S3 Copy** +Example to execute a batch load to the target Timestream table *myTable* with partition key *city* in the database *targetdb* with *eu-west-1* region. +Timestream objects are created by this script as per *create_timestream_resource* parameter. Source data are located in the S3 bucket *timestream-unload-sourcedb-mytable* with prefix *unload/results/*. +Error logs are stored into S3 bucket *timestream-batchload-error-logs*. If you need error log buckets to be created specify --create_error_logging_bucket. + ```bash +python3 batch_load.py --region eu-west-1 --database=targetdb --table=myTable --s3_target_bucket timestream-unload-sourcedb-mytable --destination_s3_prefix unload/results/ --data_model_file "data_model_sample.json" --create_timestream_resource --partition_key city --s3_target_error_bucket timestream-batchload-error-logs +``` + +## Usage and Requirements + +These are the full steps to execute the script in your AWS Account. + +1. Log into your AWS account and select the AWS Region in which your Timestream table is stored + +2. Launch [AWS CloudShell](https://console.aws.amazon.com/cloudshell/home) or your local shell (Python 3.10 or newer is required) + +3. Clone this source code project using [git](https://git-scm.com/) or download it manually + +4. Make sure you have latest pip package installed + ```bash + python3 -m ensurepip --upgrade + ``` +5. Install Python [boto3](https://pypi.org/project/boto3/), [backoff](https://pypi.org/project/backoff/) and [tqdm](https://pypi.org/project/tqdm/) packages + ```bash + python3 -m pip install boto3 + python3 -m pip install backoff + python3 -m pip install tqdm + ``` +6. Run the unload.py or the batch_load.py as described above. + diff --git a/tools/python/copy-table/batch_load.py b/tools/python/copy-table/batch_load.py new file mode 100644 index 00000000..8937152a --- /dev/null +++ b/tools/python/copy-table/batch_load.py @@ -0,0 +1,128 @@ +import json +import boto3 +from backoff import expo +import argparse +from utils.s3_utils import * +from utils.timestream_utils import * +from utils.logger_utils import create_logger +import sys + + +if __name__ == '__main__': + + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) + + #parser.add_argument("-k", "--kmsId", help="KMS key for updating the database") + parser.add_argument("--region", help="provide the aws region",default=None,required=False) + parser.add_argument("--database_name", help="timestream database name",default='batch_load_test', required=False) + parser.add_argument("--table_name", help="timestream table name",default='batch_load_test', required=False) + parser.add_argument("--partition_key", help="partition key for timestream table",default=None, required=False) + parser.add_argument("--create_timestream_resource", help="provide this if timestream database and table have to be created", action='store_true', required=False, default=False) + parser.add_argument("--memory_store_retenion_in_hours", type=int, help="memory store retention in hours for timestream table", required=False, default=24) + parser.add_argument("--magnetic_store_retention_in_days", type=int, help="magnetic store retention in days for timestream table", required=False, default=3655) + parser.add_argument("--create_error_logging_bucket", help ="provide this option if error logging bucket for batchload has to be created", action='store_true', required=False, default=False) + parser.add_argument("--create_destination_bucket", help ="provide this option if bucket for batchload target bucket has to be created", action='store_true', required=False, default=False) + parser.add_argument("--copy_s3_bucket", help ="provide this option if unload files have to copied from source bucket",action='store_true', required=False, default=False) + parser.add_argument("--s3_source_bucket_location", help ="location of source s3 bucket, if copy_s3_bucket is set to true",default=None, required=False) + parser.add_argument("--data_model_file", help ="data model JSON file location for batchload", required=True) + parser.add_argument("--s3_target_bucket", help ="target bucket for batchload", default=None, required=False) + parser.add_argument("--s3_target_error_bucket", help ="target bucket for batchload errors", default=None, required=False) + parser.add_argument("--source_s3_prefix", help ="source bucket prefix if copy_s3_bucket is set true", default="results/", required=False) + parser.add_argument("--destination_s3_prefix", help ="desctination bucket prefix ifcopy_s3_bucket is set true", default="dest/", required=False) + parser.add_argument("--sns_topic_arn", help="SNS topic ARN for sending any batchload failures", default=None, required=False) + parser.add_argument("--num_of_batchload_threads", type=int,help="number of parallel batchloads threads", default=5, choices=(1,5), required=False) #nargs and const can be added. + parser.add_argument("--multi_part_upload_chunk", type=int, help="multi part upload chunk size in bytes, default is 500MB", default=524288000, required=False) + + #assign arguments to args variable + args = parser.parse_args() + + #create logger + logger = create_logger("migration_logger") + + + #parse region + sts_client = boto3.client('sts') + if args.region is None: + region=sts_client.meta.region_name + else: + region=args.region + + logger.info(f'region {region}') + + #assign few required variable. + account_id = sts_client.get_caller_identity().get('Account') + database = args.database_name + table = args.table_name + + #sns region check + sns_topic_arn=args.sns_topic_arn + if args.sns_topic_arn is not None: + assert sns_topic_arn.startswith('arn:aws:sns:'), "Invalid SNS topic ARN format." + sns_region = sns_topic_arn.split(":")[3] + assert sns_region == region, f"The specified SNS topic ARN does not match the provided region. {region}" + + + #Initiate s3 and timestream utilities + s3_utility = s3Utility(region,args.multi_part_upload_chunk) + timestream_utility = timestreamUtility(region, database, table, sns_topic_arn) + + + #assign default bucket names if not provided + bucket_suffix = f'{database}-{table}-{account_id}-{region}' + s3_target_bucket = args.s3_target_bucket if args.s3_target_bucket is not None else f'timestream-batchload-{bucket_suffix}' + logger.info(f's3_target_bucket_location {s3_target_bucket}') + s3_target_error_bucket = args.s3_target_error_bucket if args.s3_target_error_bucket is not None else f'timestream-batchload-error-{bucket_suffix}' + logger.info(f's3_target_error_bucket_location {s3_target_error_bucket}') + + + #create destination buckets + if args.create_destination_bucket: + s3_utility.create_s3_bucket(s3_target_bucket) + + if args.create_error_logging_bucket: + s3_utility.create_s3_bucket(s3_target_error_bucket) + + #create database and required if required + if args.create_timestream_resource and args.partition_key is None: + raise ValueError("Partition key must be provided if create_timestream_resource is set to true.") + elif args.create_timestream_resource and args.partition_key is not None: + timestream_utility.create_timestream_res(args.partition_key, args.memory_store_retenion_in_hours, args.magnetic_store_retention_in_days) + + + + #append "/" if user misses providing in the end for source and target prefix + if not args.source_s3_prefix.endswith('/'): + args.source_s3_prefix += '/' + + if not args.destination_s3_prefix.endswith('/'): + args.destination_s3_prefix += '/' + + source_s3_prefix = f"{args.source_s3_prefix}" + dest_s3_prefix = f"{args.destination_s3_prefix}" + #final_dest_s3_prefix = f'{dest_s3_prefix}' + + + #copy source S3 content to target only CSV files. + if args.copy_s3_bucket: + if args.s3_source_bucket_location is None: + logger.error(f'Provide the source bucket name with argument s3_source_bucket_location') + sys.exit() + else: + s3_utility.copy_multiple_s3_objects(args.s3_source_bucket_location, s3_target_bucket, source_s3_prefix, dest_s3_prefix) + + + #load data model file + try: + f = open(args.data_model_file) + data_model = json.load(f) + logger.info(f'Using datamodel {data_model}') + except: + logger.error(f'File {args.data_model_file} cannot be loaded, please check files exists and is correct path is provided') + + #batchload + all_csv_files_list,sorted_list_s3_partitions= s3_utility.list_s3_object_custom(s3_target_bucket, dest_s3_prefix) + #make sure you empty the target folder before retrying for any error to write in README. + logger.info(f'all destination CSV files : {all_csv_files_list}') + logger.info(f'sorted partition list for batchload : {sorted_list_s3_partitions}') + timestream_utility.multi_thread_handler(args.num_of_batchload_threads, sorted_list_s3_partitions, data_model, s3_target_bucket, s3_target_error_bucket) + diff --git a/tools/python/copy-table/data_model_sample.json b/tools/python/copy-table/data_model_sample.json new file mode 100644 index 00000000..6d74f60d --- /dev/null +++ b/tools/python/copy-table/data_model_sample.json @@ -0,0 +1,29 @@ +{ + "TimeUnit": "NANOSECONDS", + "TimeColumn": "nanoseconds", + "DimensionMappings": [ + { + "SourceColumn": "city", + "DestinationColumn": "city" + }, + { + "SourceColumn": "text", + "DestinationColumn": "text" + } + ], + "MultiMeasureMappings": { + "MultiMeasureAttributeMappings": [ + { + "SourceColumn": "cpu_utilization", + "TargetMultiMeasureAttributeName": "cpu_utilization", + "MeasureValueType": "DOUBLE" + }, + { + "SourceColumn": "memory", + "TargetMultiMeasureAttributeName": "memory", + "MeasureValueType": "DOUBLE" + } + ] + }, + "MeasureNameColumn": "measure_name" +} diff --git a/tools/python/copy-table/unload.py b/tools/python/copy-table/unload.py new file mode 100644 index 00000000..6721b964 --- /dev/null +++ b/tools/python/copy-table/unload.py @@ -0,0 +1,126 @@ +#!/usr/bin/python + +import argparse +import boto3 +import json +from botocore.config import Config +from utils.logger_utils import create_logger +from utils.s3_utils import s3Utility + +def main(logger, region, database, table, bucket_s3_uri, from_time, end_time, partition, iam_role_bucket_policy): + + session = boto3.Session() + if (region is None or len(region) == 0): + region = session.region_name #set current region + + timestream_client = session.client('timestream-query', config=Config(region_name=region)) + s3_client = boto3.client('s3') + sts_client = boto3.client('sts') + + s3_utility = s3Utility(region, None) + + account_id = sts_client.get_caller_identity().get('Account') + + # Create S3 bucket if S3 URI is not provided + if (bucket_s3_uri is None or len(bucket_s3_uri) == 0): + bucket_name = 'timestream-unload-' + database + '-' + table + '-' + account_id + bucket_name = bucket_name.lower() + + bucket_s3_uri = s3_utility.create_s3_bucket(bucket_name) + + # Create bucket policy for accessing data if IAM Role is provided + if (iam_role_bucket_policy): + bucket_name = bucket_s3_uri.split('s3://')[1] + bucket_name = bucket_name.split('/')[0] + + bucket_policy = { + 'Version': '2012-10-17', + 'Statement': [{ + 'Sid': 'PermissionS3Copy1', + 'Effect': 'Allow', + 'Principal': {'AWS': f'{iam_role_bucket_policy}'}, + 'Action': ['s3:GetObject'], + 'Resource': f'arn:aws:s3:::{bucket_name}/*' + },{ + 'Sid': 'PermissionS3Copy2', + 'Effect': 'Allow', + 'Principal': {'AWS': f'{iam_role_bucket_policy}'}, + 'Action': ['s3:ListBucket'], + 'Resource': f'arn:aws:s3:::{bucket_name}' + } + ] + } + + bucket_policy = json.dumps(bucket_policy) + + s3_client.put_bucket_policy(Bucket=bucket_name, Policy=bucket_policy) + + query = build_query(database, table, bucket_s3_uri, from_time, end_time, partition) + run_query(logger, timestream_client, query) + +def build_query(database, table, bucket_s3_uri, from_time, end_time, partition): + unload_query = "UNLOAD(" + unload_query += " SELECT *, to_nanoseconds(time) as nanoseconds" + if (partition): + match partition: + case "day": + unload_query += ", DATE_FORMAT(time,'%Y-%m-%d') as partition_date" + case "month": + unload_query += ", DATE_FORMAT(time,'%Y-%m') as partition_date" + case "year": + unload_query += ", DATE_FORMAT(time,'%Y') as partition_date" + unload_query += " FROM " + database + "." + table + if (from_time and end_time): + unload_query += " WHERE time >= '" + from_time + "' AND time < '" + end_time + "'" + elif (from_time): + unload_query += " WHERE time >= '" + from_time + "'" + elif (end_time): + unload_query += " WHERE time < '" + end_time + "'" + unload_query += " ORDER BY " + unload_query += " time asc )" + + unload_query += " TO '" + bucket_s3_uri + "'" + unload_query += " WITH (partitioned_by = ARRAY['partition_date']," + unload_query += " format='csv'," + unload_query += " compression='none'," + unload_query += " max_file_size = '4.9GB'," + unload_query += " include_header='true')" + + return unload_query + +def run_query(logger, client, query): + paginator = client.get_paginator('query') + try: + logger.info("QUERY EXECUTED: " + query) + logger.info("UNLOAD IN PROGRESS...") + page_iterator = paginator.paginate(QueryString=query) + for page in page_iterator: + logger.info("Progress Percentage: " + str(page['QueryStatus']['ProgressPercentage']) + "%") + logger.debug(page) + except Exception as err: + logger.error("Exception while running query: ", err) + raise + +if __name__ == '__main__': + + parser = argparse.ArgumentParser() + + parser.add_argument("-r", "--region", help="AWS region of your Timestream table to be unloaded", required=False) + parser.add_argument("-d", "--database", help="Timestream database name where is located the table to be unloaded", required=True) + parser.add_argument("-t", "--table", help="Timestream table name to be unloaded", required=True) + parser.add_argument("-s", "--s3_uri", help="S3 Bucket URI to store unload data", required=False) + parser.add_argument("-f", "--from_time", help="Timestamp from which you want to unload data (included)", required=False) + parser.add_argument("-e", "--end_time", help="Timestamp to which you want to unload data (not included)", required=False) + parser.add_argument("-p", "--partition", help="Partition data by 'day', 'month' or 'year'", required=True, choices=['day', 'month', 'year']) + parser.add_argument("-i", "--iam_role_bucket_policy", help="S3 Bucket policy to apply to the S3 Bucket where unload data is stored", required=False) + + #assign arguments to args variable + args = parser.parse_args() + + #create logger + logger = create_logger("Unload Logger") + + main(logger, args.region, args.database, args.table, args.s3_uri, args.from_time, args.end_time, args.partition, args.iam_role_bucket_policy) + + logger.info("COMPLETED SUCCESSFULLY") + diff --git a/tools/python/copy-table/utils/logger_utils.py b/tools/python/copy-table/utils/logger_utils.py new file mode 100644 index 00000000..ec0fa8b7 --- /dev/null +++ b/tools/python/copy-table/utils/logger_utils.py @@ -0,0 +1,19 @@ +import logging + +def create_logger(logger_name, log_level=logging.INFO): + # Create logger + logger = logging.getLogger(logger_name) + logger.setLevel(log_level) + + # Create console handler and set level + ch = logging.StreamHandler() + ch.setLevel(log_level) + + # Create formatter and add it to the handler + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + ch.setFormatter(formatter) + + # Add the handler to the logger + logger.addHandler(ch) + + return logger \ No newline at end of file diff --git a/tools/python/copy-table/utils/s3_utils.py b/tools/python/copy-table/utils/s3_utils.py new file mode 100644 index 00000000..a20aeaf7 --- /dev/null +++ b/tools/python/copy-table/utils/s3_utils.py @@ -0,0 +1,138 @@ + +import boto3 +from utils.logger_utils import create_logger +import botocore +import boto3.s3.transfer as s3transfer +import tqdm +import os + +class s3Utility: + def __init__(self, region, multi_part_upload_chunk): + botocore_config = botocore.config.Config( + max_pool_connections=5000, retries={'max_attempts': 10}) + self.s3_client = boto3.client( + 's3', region_name=region, config=botocore_config) + self.logger = create_logger("s3_logger") + self.multi_part_upload_chunk = multi_part_upload_chunk + self.region = region + + + # create s3 bucket and return S3 bucket URI + def create_s3_bucket(self, bucket_name): + try: + if self.region == 'us-east-1': + response_s3 = self.s3_client.create_bucket( + Bucket=bucket_name + ) + else: + response_s3 = self.s3_client.create_bucket( + Bucket=bucket_name, + CreateBucketConfiguration={ + 'LocationConstraint': self.region + } + ) + + bucket_s3_uri = 's3://' + bucket_name + self.logger.info(f'S3 Bucket created successfully {bucket_s3_uri}') + except self.s3_client.exceptions.BucketAlreadyOwnedByYou: + self.logger.warning(f"Bucket {bucket_name} is already created and owned by you") + bucket_s3_uri = 's3://' + bucket_name + except Exception as err: + self.logger.error("Creating bucket {bucket_name} failed :", err) + raise + + return bucket_s3_uri + + # copy s3 objects from source to target + def copy_s3_object_custom(self, source_bucket, source_key_list, destination_bucket, destination_key): + bucket_size = 0 + try: + + transfer_config = s3transfer.TransferConfig( + use_threads=True, + max_concurrency=20, + multipart_chunksize=self.multi_part_upload_chunk + ) + + progress = tqdm.tqdm( + desc='upload', + total=bucket_size, unit='B', unit_scale=1, + position=0, + bar_format='{desc:<10}{percentage:3.0f}%|{bar:10}{r_bar}') + + s3t = s3transfer.create_transfer_manager( + self.s3_client, transfer_config) + + # Not adding only deskkey, as adding sourcekey more meaningful name instead of appending just with index + destination_key_list = [ + destination_key+src for src in source_key_list] + for src, dest in zip(source_key_list, destination_key_list): + + copy_source = { + 'Bucket': source_bucket, + 'Key': src + } + + s3t.copy(copy_source=copy_source, + bucket=destination_bucket, + key=dest, + subscribers=[ + s3transfer.ProgressCallbackInvoker(progress.update),], + ) + + # close transfer job + s3t.shutdown() + progress.close() + + except Exception as err: + self.logger.error(f'multipart upload failed with error : {err}') + raise + + def copy_multiple_s3_objects(self, source_bucket, destination_bucket, source_s3_prefix, dest_s3_prefix): + all_csv_files_list, partition_names = self.list_s3_object_custom( + source_bucket, source_s3_prefix) + self.logger.info(f'I am here {all_csv_files_list}') + self.copy_s3_object_custom( + source_bucket, all_csv_files_list, destination_bucket, dest_s3_prefix) + + def list_s3_object_custom(self, bucket_name, common_prefix=None): + sources_keys_list = [] + all_files_list = [] + paginator = self.s3_client.get_paginator('list_objects_v2') + + try: + operation_parameters = {'Bucket': bucket_name, 'Delimiter': '/'} + + if common_prefix is not None: + operation_parameters = { + 'Bucket': bucket_name, 'Prefix': common_prefix} + # common_prefix_list_count = len(common_prefix.split("/")) - 1 + page_iterator = paginator.paginate(**operation_parameters) + + for page in page_iterator: + if common_prefix is not None: + keys = [obj['Key'] for obj in page.get('Contents', [])] + all_files_list.extend( + [key for key in keys if key.endswith('.csv')]) + parition_names = set(os.path.dirname( + file_path) + '/' for file_path in all_files_list) + sources_keys_list.extend(parition_names) + else: + for folder in page.get('CommonPrefixes', []): + current_folder = folder['Prefix'] + operation_parameters = { + 'Bucket': bucket_name, 'Prefix': current_folder} + page_iterator = paginator.paginate( + **operation_parameters) + for page in page_iterator: + keys = [obj['Key'] + for obj in page.get('Contents', [])] + sources_keys_list.extend( + [key for key in keys if key.endswith('.csv')]) + + except Exception as err: + self.logger.error( + f'Unable to list keys for the {bucket_name}: {err}') + raise + + return all_files_list, sorted(list(set(sources_keys_list))) \ No newline at end of file diff --git a/tools/python/copy-table/utils/timestream_utils.py b/tools/python/copy-table/utils/timestream_utils.py new file mode 100644 index 00000000..3fb8501b --- /dev/null +++ b/tools/python/copy-table/utils/timestream_utils.py @@ -0,0 +1,158 @@ +import boto3 +import backoff +from utils.logger_utils import create_logger +import threading +import queue +import botocore + + +class timestreamUtility: + def __init__(self, region, database, table, sns_topic_arn): + botocore_config = botocore.config.Config( + max_pool_connections=5000, retries={'max_attempts': 10}) + self.timestream_client = boto3.client( + 'timestream-write', region_name=region) + self.sns_client = boto3.client( + 'sns', region_name=region, config=botocore_config) + self.logger = create_logger("timestream_logger") + self.database = database + self.table = table + self.sns_topic_arn = sns_topic_arn + self.partition_queue = queue.Queue() + + def backoff_handler(details): + print( + f"Retrying batch load task status in {details['wait']:.2f} seconds...") + + def sns_publish_message(self, message, subject, message_structure='email'): + response = self.sns_client.publish( + TopicArn=self.sns_topic_arn, Message=message, Subject=subject, MessageStructure=message_structure) + print(response) + + @backoff.on_predicate(backoff.constant, predicate=lambda r: r['BatchLoadTaskDescription']['TaskStatus'] in ['CREATED', 'IN_PROGRESS'], interval=10, jitter=None, max_tries=720, on_backoff=backoff_handler) + def check_task_status(self, batch_load_task_id, s3_target_bucket_location, partition): + response = self.timestream_client.describe_batch_load_task( + TaskId=batch_load_task_id + ) + task_status = response['BatchLoadTaskDescription']['TaskStatus'] + + self.logger.info(f"batch load for partition : {partition} taskid : {batch_load_task_id} status ----> {task_status} \n Progress report : {response['BatchLoadTaskDescription']['ProgressReport']}") + + + if task_status in ['PROGRESS_STOPPED', 'FAILED', 'PENDING_RESUME']: + self.logger.error( + response['BatchLoadTaskDescription']['ProgressReport']) + error_message = f"Location of job logging reports {response['BatchLoadTaskDescription']['ReportConfiguration']}" + # sns notification + exception_message = f'Timestream batchload job for source bucket {s3_target_bucket_location} and partition {partition} associated to timestream batchload task_id {batch_load_task_id} is {task_status}. \n{error_message}' + if self.sns_topic_arn is not None: + self.sns_publish_message( + exception_message, f"timestream batchload status") + raise Exception(exception_message) + return response + + def timestream_batch_load(self, folder, data_model, s3_target_bucket_location, s3_error_bucket_location): + try: + result = self.timestream_client.create_batch_load_task(TargetDatabaseName=self.database, TargetTableName=self.table, + DataModelConfiguration={"DataModel": data_model + }, + DataSourceConfiguration={ + "DataSourceS3Configuration": { + "BucketName": s3_target_bucket_location, + "ObjectKeyPrefix": folder + }, + "DataFormat": "CSV" + }, + ReportConfiguration={ + "ReportS3Configuration": { + "BucketName": s3_error_bucket_location, + "ObjectKeyPrefix": "batch_load_errors", + "EncryptionOption": "SSE_S3", + } + } + ) + task_id = result["TaskId"] + self.logger.info( + f" Successfully created batch load task for partition {folder}:{task_id} \n") + self.check_task_status(task_id, s3_target_bucket_location, folder) + except Exception as err: + self.logger.error(f"Create batch load task job failed:{err}") + raise + + def create_timestream_res(self, partition_key, memory_store_retenion_in_hours, magnetic_store_retention_in_days): + + # create database if not exists + self.logger.info("Creating Database") + try: + self.timestream_client.create_database(DatabaseName=self.database) + self.logger.info(f"Database {self.database} created successfully") + except self.timestream_client.exceptions.ConflictException: + self.logger.info( + f"Database {self.database} exists. Skipping database creation") + except Exception as err: + self.logger.info(f"Create database failed with error : {err}") + raise + + # create table + self.logger.info("Creating table") + retention_properties = { + 'MemoryStoreRetentionPeriodInHours': memory_store_retenion_in_hours, + 'MagneticStoreRetentionPeriodInDays': magnetic_store_retention_in_days + } + magnetic_store_write_properties = { + 'EnableMagneticStoreWrites': True + } + + schema = { + "CompositePartitionKey": [ + { + "EnforcementInRecord": "REQUIRED", + "Name": partition_key, + "Type": "DIMENSION" + } + ] + } + + try: + self.timestream_client.create_table(DatabaseName=self.database, TableName=self.table, + RetentionProperties=retention_properties, + MagneticStoreWriteProperties=magnetic_store_write_properties, + Schema=schema + ) + self.logger.info(f"Table {self.table} successfully created") + except self.timestream_client.exceptions.ConflictException: + self.logger.info( + f"Table {self.table} exists on database {self.database}. Skipping table creation") + except Exception as err: + self.logger.error(f"Create table failed: {err}") + raise + + def thread_handler(self, data_model, s3_target_bucket_location, s3_error_bucket_location): + while True: + try: + # Get the next partition from the queue + partition = self.partition_queue.get_nowait() + except queue.Empty: + # If the queue is empty, break the loop + break + self.timestream_batch_load( + partition, data_model, s3_target_bucket_location, s3_error_bucket_location) + # can add wait time logic sleep(2 hours) time.sleep(sleep_interval_in_seconds) here based on the volume skipping here as this is just a sample. + self.partition_queue.task_done() + + def multi_thread_handler(self, threads_count, partition_list, data_model, s3_target_bucket_location, s3_error_bucket_location): + + # Fill the queue with partitions + for partition in partition_list: + self.partition_queue.put(partition) + + threads = [] + + for i in range(threads_count): + thread = threading.Thread(target=self.thread_handler, args=( + data_model, s3_target_bucket_location, s3_error_bucket_location)) + thread.start() + threads.append(thread) + + for thread in threads: + thread.join()