From f650498bcbfdf52ef6ae66f4ae22a2af8533fe1f Mon Sep 17 00:00:00 2001 From: Soren Spicknall Date: Fri, 19 Jul 2024 14:21:40 -0500 Subject: [PATCH 1/4] Add configurable timeout to BigQuery transactions --- parsons/google/google_bigquery.py | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/parsons/google/google_bigquery.py b/parsons/google/google_bigquery.py index ab47cadce..a16f1b4c1 100644 --- a/parsons/google/google_bigquery.py +++ b/parsons/google/google_bigquery.py @@ -361,6 +361,7 @@ def copy_from_gcs( compression_type: str = "gzip", new_file_extension: str = "csv", template_table: Optional[str] = None, + max_timeout: int = 30, **load_kwargs, ): """ @@ -421,6 +422,8 @@ def copy_from_gcs( template_table: str Table name to be used as the load schema. Load operation wil use the same columns and data types as the template table. + max_timeout: int + The maximum number of seconds to wait for a request before the job fails. **load_kwargs: kwargs Other arguments to pass to the underlying load_table_from_uri call on the BigQuery client. @@ -463,12 +466,14 @@ def copy_from_gcs( job_config=job_config, compression_type=compression_type, new_file_extension=new_file_extension, + max_timeout=max_timeout, ) else: return self._load_table_from_uri( source_uris=gcs_blob_uri, destination=table_ref, job_config=job_config, + max_timeout=max_timeout, **load_kwargs, ) except exceptions.BadRequest as e: @@ -493,6 +498,7 @@ def copy_from_gcs( job_config=job_config, compression_type=compression_type, new_file_extension=new_file_extension, + max_timeout=max_timeout, ) elif "Schema has no field" in str(e): logger.debug(f"{gcs_blob_uri.split('/')[-1]} is empty, skipping file") @@ -500,6 +506,10 @@ def copy_from_gcs( else: raise e + + except exceptions.DeadlineExceeded as e: + logger.error(f"Max timeout exceeded for {gcs_blob_uri.split('/')[-1]}") + raise e def copy_large_compressed_file_from_gcs( self, @@ -519,6 +529,7 @@ def copy_large_compressed_file_from_gcs( compression_type: str = "gzip", new_file_extension: str = "csv", template_table: Optional[str] = None, + max_timeout: int = 30, **load_kwargs, ): """ @@ -577,6 +588,8 @@ def copy_large_compressed_file_from_gcs( template_table: str Table name to be used as the load schema. Load operation wil use the same columns and data types as the template table. + max_timeout: int + The maximum number of seconds to wait for a request before the job fails. **load_kwargs: kwargs Other arguments to pass to the underlying load_table_from_uri call on the BigQuery client. @@ -621,6 +634,7 @@ def copy_large_compressed_file_from_gcs( source_uris=uncompressed_gcs_uri, destination=table_ref, job_config=job_config, + max_timeout=max_timeout, **load_kwargs, ) @@ -647,6 +661,7 @@ def copy_s3( tmp_gcs_bucket: Optional[str] = None, template_table: Optional[str] = None, job_config: Optional[LoadJobConfig] = None, + max_timeout: int = 30, **load_kwargs, ): """ @@ -692,6 +707,8 @@ def copy_s3( on the BigQuery client. The function will create its own if not provided. Note if there are any conflicts between the job_config and other parameters, the job_config values are preferred. + max_timeout: int + The maximum number of seconds to wait for a request before the job fails. `Returns` Parsons Table or ``None`` @@ -724,6 +741,7 @@ def copy_s3( nullas=nullas, job_config=job_config, template_table=template_table, + max_timeout=max_timeout, **load_kwargs, ) finally: @@ -745,6 +763,7 @@ def copy( allow_jagged_rows: bool = True, quote: Optional[str] = None, schema: Optional[List[dict]] = None, + max_timeout: int = 30, **load_kwargs, ): """ @@ -774,6 +793,8 @@ def copy( template_table: str Table name to be used as the load schema. Load operation wil use the same columns and data types as the template table. + max_timeout: int + The maximum number of seconds to wait for a request before the job fails. **load_kwargs: kwargs Arguments to pass to the underlying load_table_from_uri call on the BigQuery client. @@ -823,6 +844,7 @@ def copy( source_uris=temp_blob_uri, destination=self.get_table_ref(table_name=table_name), job_config=job_config, + max_timeout=max_timeout **load_kwargs, ) finally: @@ -1339,7 +1361,7 @@ def _validate_copy_inputs(self, if_exists: str, data_type: str): if data_type not in ["csv", "json"]: raise ValueError(f"Only supports csv or json files [data_type = {data_type}]") - def _load_table_from_uri(self, source_uris, destination, job_config, **load_kwargs): + def _load_table_from_uri(self, source_uris, destination, job_config, max_timeout, **load_kwargs): load_job = self.client.load_table_from_uri( source_uris=source_uris, destination=destination, @@ -1348,7 +1370,7 @@ def _load_table_from_uri(self, source_uris, destination, job_config, **load_kwar ) try: - load_job.result() + load_job.result(timeout=max_timeout) return load_job except exceptions.BadRequest as e: for idx, error_ in enumerate(load_job.errors): From e1de9fab18e266df819d46aed9217f8e9fea9c47 Mon Sep 17 00:00:00 2001 From: Soren Spicknall Date: Fri, 19 Jul 2024 14:40:37 -0500 Subject: [PATCH 2/4] Missing comma --- parsons/google/google_bigquery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsons/google/google_bigquery.py b/parsons/google/google_bigquery.py index a16f1b4c1..2b9f7af27 100644 --- a/parsons/google/google_bigquery.py +++ b/parsons/google/google_bigquery.py @@ -844,7 +844,7 @@ def copy( source_uris=temp_blob_uri, destination=self.get_table_ref(table_name=table_name), job_config=job_config, - max_timeout=max_timeout + max_timeout=max_timeout, **load_kwargs, ) finally: From b7c174a26b03e5a673ea4be91c564cbd8217f924 Mon Sep 17 00:00:00 2001 From: Soren Spicknall Date: Fri, 19 Jul 2024 14:43:21 -0500 Subject: [PATCH 3/4] Linting fixes --- parsons/google/google_bigquery.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/parsons/google/google_bigquery.py b/parsons/google/google_bigquery.py index 2b9f7af27..7febf6a9a 100644 --- a/parsons/google/google_bigquery.py +++ b/parsons/google/google_bigquery.py @@ -506,7 +506,7 @@ def copy_from_gcs( else: raise e - + except exceptions.DeadlineExceeded as e: logger.error(f"Max timeout exceeded for {gcs_blob_uri.split('/')[-1]}") raise e @@ -1361,7 +1361,8 @@ def _validate_copy_inputs(self, if_exists: str, data_type: str): if data_type not in ["csv", "json"]: raise ValueError(f"Only supports csv or json files [data_type = {data_type}]") - def _load_table_from_uri(self, source_uris, destination, job_config, max_timeout, **load_kwargs): + def _load_table_from_uri(self, source_uris, destination, job_config, max_timeout, + **load_kwargs): load_job = self.client.load_table_from_uri( source_uris=source_uris, destination=destination, From 8a6cc1009c314809384b48ac75c317d872cb2d25 Mon Sep 17 00:00:00 2001 From: Soren Spicknall Date: Fri, 19 Jul 2024 14:45:21 -0500 Subject: [PATCH 4/4] Correct indentation depth --- parsons/google/google_bigquery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsons/google/google_bigquery.py b/parsons/google/google_bigquery.py index 7febf6a9a..64f239654 100644 --- a/parsons/google/google_bigquery.py +++ b/parsons/google/google_bigquery.py @@ -1362,7 +1362,7 @@ def _validate_copy_inputs(self, if_exists: str, data_type: str): raise ValueError(f"Only supports csv or json files [data_type = {data_type}]") def _load_table_from_uri(self, source_uris, destination, job_config, max_timeout, - **load_kwargs): + **load_kwargs): load_job = self.client.load_table_from_uri( source_uris=source_uris, destination=destination,