Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add code comments for better readability #170

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
43 changes: 40 additions & 3 deletions tap_facebook/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ def transform_datetime_string(dts):
return singer.strftime(parsed_dt)

def iter_delivery_info_filter(stream_type):
"""
Prepare filter paramaters for ads, adsets and campaigns
"""
filt = {
"field": stream_type + ".delivery_info",
"operator": "IN",
Expand Down Expand Up @@ -178,6 +181,7 @@ class Stream(object):
replication_method = 'FULL_TABLE'

def automatic_fields(self):
# Prepare set of fields with automatic inclusion
fields = set()
if self.catalog_entry:
props = metadata.to_map(self.catalog_entry.metadata)
Expand All @@ -191,6 +195,7 @@ def automatic_fields(self):


def fields(self):
# prepare set of selected and automatic fields
fields = set()
if self.catalog_entry:
props = metadata.to_map(self.catalog_entry.metadata)
Expand Down Expand Up @@ -306,6 +311,7 @@ def _call_get_ads(self, params):

def __iter__(self):
def do_request():
# Set a params for ads without deleted objects
params = {'limit': RESULT_RETURN_LIMIT}
if self.current_bookmark:
params.update({'filtering': [{'field': 'ad.' + UPDATED_TIME_KEY, 'operator': 'GREATER_THAN', 'value': self.current_bookmark.int_timestamp}]})
Expand All @@ -314,8 +320,10 @@ def do_request():
def do_request_multiple():
params = {'limit': RESULT_RETURN_LIMIT}
bookmark_params = []
# Set a params for date filter based on bookmark
if self.current_bookmark:
bookmark_params.append({'field': 'ad.' + UPDATED_TIME_KEY, 'operator': 'GREATER_THAN', 'value': self.current_bookmark.int_timestamp})
# Set a params for ads with deleted objects also when include_deleted is set true
for del_info_filt in iter_delivery_info_filter('ad'):
params.update({'filtering': [del_info_filt] + bookmark_params})
filt_ads = self._call_get_ads(params)
Expand All @@ -326,6 +334,7 @@ def do_request_multiple():
def prepare_record(ad):
return ad.api_get(fields=self.fields()).export_all_data()

# Call a request function based on include_deleted parameter
if CONFIG.get('include_deleted', 'false').lower() == 'true':
ads = do_request_multiple()
else:
Expand All @@ -352,6 +361,7 @@ def _call_get_ad_sets(self, params):

def __iter__(self):
def do_request():
# Set a params for ads without deleted objects
params = {'limit': RESULT_RETURN_LIMIT}
if self.current_bookmark:
params.update({'filtering': [{'field': 'adset.' + UPDATED_TIME_KEY, 'operator': 'GREATER_THAN', 'value': self.current_bookmark.int_timestamp}]})
Expand All @@ -360,8 +370,10 @@ def do_request():
def do_request_multiple():
params = {'limit': RESULT_RETURN_LIMIT}
bookmark_params = []
# Set a params for date filter based on bookmark
if self.current_bookmark:
bookmark_params.append({'field': 'adset.' + UPDATED_TIME_KEY, 'operator': 'GREATER_THAN', 'value': self.current_bookmark.int_timestamp})
# Set a params for ads with deleted objects also when include_deleted is set true
for del_info_filt in iter_delivery_info_filter('adset'):
params.update({'filtering': [del_info_filt] + bookmark_params})
filt_adsets = self._call_get_ad_sets(params)
Expand All @@ -372,6 +384,7 @@ def do_request_multiple():
def prepare_record(ad_set):
return ad_set.api_get(fields=self.fields()).export_all_data()

# Call a request function based on include_deleted parameter
if CONFIG.get('include_deleted', 'false').lower() == 'true':
ad_sets = do_request_multiple()
else:
Expand Down Expand Up @@ -400,6 +413,7 @@ def __iter__(self):
pull_ads = 'ads' in props

def do_request():
# Set a params for ads without deleted objects
params = {'limit': RESULT_RETURN_LIMIT}
if self.current_bookmark:
params.update({'filtering': [{'field': 'campaign.' + UPDATED_TIME_KEY, 'operator': 'GREATER_THAN', 'value': self.current_bookmark.int_timestamp}]})
Expand All @@ -408,8 +422,10 @@ def do_request():
def do_request_multiple():
params = {'limit': RESULT_RETURN_LIMIT}
bookmark_params = []
# Set a params for date filter based on bookmark
if self.current_bookmark:
bookmark_params.append({'field': 'campaign.' + UPDATED_TIME_KEY, 'operator': 'GREATER_THAN', 'value': self.current_bookmark.int_timestamp})
# Set a params for ads with deleted objects also when include_deleted is set true
for del_info_filt in iter_delivery_info_filter('campaign'):
params.update({'filtering': [del_info_filt] + bookmark_params})
filt_campaigns = self._call_get_campaigns(params)
Expand All @@ -427,6 +443,7 @@ def prepare_record(campaign):
campaign_out['ads']['data'].append({'id': ad_id})
return campaign_out

# Call a request function based on include_deleted parameter
if CONFIG.get('include_deleted', 'false').lower() == 'true':
campaigns = do_request_multiple()
else:
Expand All @@ -444,6 +461,7 @@ class Leads(Stream):
replication_method = 'INCREMENTAL'

def compare_lead_created_times(self, leadA, leadB):
# Compare two leads and return lead with max replication key
if leadA is None:
return leadB
timestampA = pendulum.parse(leadA[self.replication_key])
Expand Down Expand Up @@ -491,12 +509,14 @@ def sync_batches(self, stream_objects):
# Added retry_pattern to handle AttributeError raised from account.get_ads() below
@retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5)
def get_ads(self):
# Set a parameters and get ads
params = {'limit': RESULT_RETURN_LIMIT}
yield from self.account.get_ads(params=params)

# Added retry_pattern to handle AttributeError raised from ad.get_leads() below
@retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5)
def get_leads(self, ads, start_time, previous_start_time):
# Set a parameters and get leads
start_time = int(start_time.timestamp()) # Get unix timestamp
params = {'limit': RESULT_RETURN_LIMIT,
'filtering': [{'field': 'time_created',
Expand All @@ -509,6 +529,7 @@ def get_leads(self, ads, start_time, previous_start_time):
yield from ad.get_leads(params=params)

def sync(self):
# Get a bookmark or start date for leads
start_time = pendulum.utcnow()
previous_start_time = self.state.get("bookmarks", {}).get("leads", {}).get(self.replication_key, CONFIG.get('start_date'))

Expand All @@ -517,6 +538,7 @@ def sync(self):
leads = self.get_leads(ads, start_time, int(previous_start_time.timestamp()))
latest_lead_time = self.sync_batches(leads)

# Write bookmark and update state if latest lead time found from data
if not latest_lead_time is None:
singer.write_bookmark(self.state, 'leads', self.replication_key, latest_lead_time)
singer.write_state(self.state)
Expand All @@ -538,6 +560,7 @@ def sync(self):
]

def get_start(stream, bookmark_key):
# Get start time by bookmark of state or start date
tap_stream_id = stream.name
state = stream.state or {}
current_bookmark = singer.get_bookmark(state, tap_stream_id, bookmark_key)
Expand Down Expand Up @@ -611,6 +634,7 @@ def __attrs_post_init__(self):
def job_params(self):
start_date = get_start(self, self.bookmark_key)

# Set start date and end date using buffer days and facebook retention period
buffered_start_date = start_date.subtract(days=self.buffer_days)
min_start_date = pendulum.today().subtract(months=self.FACEBOOK_INSIGHTS_RETENTION_PERIOD)
if buffered_start_date < min_start_date:
Expand Down Expand Up @@ -755,27 +779,32 @@ def initialize_stream(account, catalog_entry, state): # pylint: disable=too-many


def get_streams_to_sync(account, catalog, state):
# Initialize and return streams selected in catalog
streams = []
for stream in STREAMS:
catalog_entry = next((s for s in catalog.streams if s.tap_stream_id == stream), None)
if catalog_entry and catalog_entry.is_selected():
if catalog_entry and catalog_entry.is_selected(): # Check if stream is selected in catalog
# TODO: Don't need name and stream_alias since it's on catalog_entry
name = catalog_entry.stream
stream_alias = catalog_entry.stream_alias
streams.append(initialize_stream(account, catalog_entry, state))
return streams

def transform_date_hook(data, typ, schema):
# Transform provided data into date format if type is 'date-time'
if typ == 'string' and schema.get('format') == 'date-time' and isinstance(data, str):
transformed = transform_datetime_string(data)
return transformed
return data

def do_sync(account, catalog, state):
# Get streams selected in catalog
streams_to_sync = get_streams_to_sync(account, catalog, state)
refs = load_shared_schema_refs()
# Loop over streams selected in catalog
for stream in streams_to_sync:
LOGGER.info('Syncing %s, fields %s', stream.name, stream.fields())
# Resolve schema references and write schema in output
schema = singer.resolve_schema_references(load_schema(stream), refs)
metadata_map = metadata.to_map(stream.catalog_entry.metadata)
bookmark_key = BOOKMARK_KEYS.get(stream.name)
Expand All @@ -790,6 +819,7 @@ def do_sync(account, catalog, state):
with Transformer(pre_hook=transform_date_hook) as transformer:
with metrics.record_counter(stream.name) as counter:
for message in stream:
# Transform data as per field selection and write records
if 'record' in message:
counter.increment()
time_extracted = utils.now()
Expand All @@ -806,6 +836,7 @@ def get_abs_path(path):


def load_schema(stream):
# Load schema of all streams from schemas
path = get_abs_path('schemas/{}.json'.format(stream.name))
schema = utils.load_json(path)

Expand All @@ -824,6 +855,7 @@ def discover_schemas():
streams = initialize_streams_for_discovery()
for stream in streams:
LOGGER.info('Loading schema for %s', stream.name)
# Resolve references and make final schema
schema = singer.resolve_schema_references(load_schema(stream), refs)

bookmark_key = BOOKMARK_KEYS.get(stream.name)
Expand All @@ -833,6 +865,7 @@ def discover_schemas():
replication_method=stream.replication_method,
valid_replication_keys=[bookmark_key] if bookmark_key else None))

# Set automatic inclusion for bookmark keys
if bookmark_key == UPDATED_TIME_KEY or bookmark_key == CREATED_TIME_KEY :
mdata = metadata.write(mdata, ('properties', bookmark_key), 'inclusion', 'automatic')

Expand All @@ -843,6 +876,7 @@ def discover_schemas():
return result

def load_shared_schema_refs():
# Load dictionary with references(shared) schemas
shared_schemas_path = get_abs_path('schemas/shared')

shared_file_names = [f for f in os.listdir(shared_schemas_path)
Expand All @@ -856,6 +890,7 @@ def load_shared_schema_refs():
return shared_schema_refs

def do_discover():
# Discover schemas and dump in STDOUT
LOGGER.info('Loading schemas')
json.dump(discover_schemas(), sys.stdout, indent=4)

Expand All @@ -871,10 +906,12 @@ def main_impl():
global RESULT_RETURN_LIMIT
RESULT_RETURN_LIMIT = CONFIG.get('result_return_limit', RESULT_RETURN_LIMIT)

# Set FacebookAdsApi with provided access token
global API
API = FacebookAdsApi.init(access_token=access_token)
user = fb_user.User(fbid='me')

# Get accounts for provided creds and validate account
accounts = user.get_ad_accounts()
account = None
for acc in accounts:
Expand All @@ -885,12 +922,12 @@ def main_impl():
except FacebookError as fb_error:
raise_from(SingerConfigurationError, fb_error)

if args.discover:
if args.discover: # Discover mode
try:
do_discover()
except FacebookError as fb_error:
raise_from(SingerDiscoveryError, fb_error)
elif args.properties:
elif args.properties: # Sync mode
catalog = Catalog.from_dict(args.properties)
try:
do_sync(account, catalog, args.state)
Expand Down