Skip to content

Commit

Permalink
test: catching as OS Error
Browse files Browse the repository at this point in the history
  • Loading branch information
cka-y committed Oct 7, 2023
1 parent ce73e28 commit 78df467
Showing 1 changed file with 19 additions and 14 deletions.
33 changes: 19 additions & 14 deletions infra/batch/datasets/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from urllib3.util.retry import Retry

import requests
from google.api_core.retry import Retry as RetryPolicy
# from google.api_core.retry import Retry as RetryPolicy
from google.cloud import storage
from datetime import datetime
from hashlib import sha256
Expand Down Expand Up @@ -49,14 +49,19 @@ def upload_dataset(url, bucket_name, stable_id, latest_hash):
}
session = requests.Session()
retry = Retry(
total=5,
total=2,
backoff_factor=0.1,
status_forcelist=[500, 502, 503, 504]
status_forcelist=[500, 502, 503, 504],
raise_on_status=False
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
response = session.get(url, headers=headers, verify=False, timeout=120, stream=True)
try:
response = session.get(url, headers=headers, verify=False, timeout=120, stream=True)
except OSError as e:
print(e)
raise Exception("OS Exception -- Connection timeout error")
response.raise_for_status()

content = response.content
Expand Down Expand Up @@ -254,15 +259,15 @@ def batch_datasets(request):
print(f"Retrieved {len(results)} active feeds.")

# Publish to topic for processing
custom_retry_policy = RetryPolicy(
initial=60.0, # at least 1 min between retries
maximum=540.0, # at most 9 minutes between retries (pub/sub timeout)
deadline=1800.0, # Retry for 30 minutes
predicate=api_core.retry.if_exception_type(
psycopg2.OperationalError,
ServiceUnavailable
),
)
# custom_retry_policy = RetryPolicy(
# initial=60.0, # at least 1 min between retries
# maximum=540.0, # at most 9 minutes between retries (pub/sub timeout)
# deadline=1800.0, # Retry for 30 minutes
# predicate=api_core.retry.if_exception_type(
# psycopg2.OperationalError,
# ServiceUnavailable
# ),
# )
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, pubsub_topic_name)
for stable_id, producer_url, feed_id in results:
Expand All @@ -281,6 +286,6 @@ def batch_datasets(request):
}
data_str = json.dumps(payload)
data_bytes = data_str.encode('utf-8')
publisher.publish(topic_path, data=data_bytes, retry=custom_retry_policy)
publisher.publish(topic_path, data=data_bytes)

return f'Publish completed. Published {len(results)} feeds to {pubsub_topic_name}.'

0 comments on commit 78df467

Please sign in to comment.