Skip to content

Commit

Permalink
updated class usage, expanded sync() sample (#11)
Browse files Browse the repository at this point in the history
* leverage Catalog class methods

* get_selected_streams()

* revert vs code

* use CatalogEntry class

* use the Schema class

* expanded sync() function example

* replace batch data functions with inline lambda

* improved bookmark handling

* rename schema_name->stream_id

* remove extra/unused variable, replace within loop iterator

* unused const
  • Loading branch information
aaronsteers authored Feb 24, 2020
1 parent 3434337 commit 4375130
Showing 1 changed file with 64 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,93 +3,108 @@
import json
import singer
from singer import utils, metadata
from singer.catalog import Catalog, CatalogEntry
from singer.schema import Schema


REQUIRED_CONFIG_KEYS = ["start_date", "username", "password"]
LOGGER = singer.get_logger()


def get_abs_path(path):
return os.path.join(os.path.dirname(os.path.realpath(__file__)), path)

# Load schemas from schemas folder

def load_schemas():
""" Load schemas from schemas folder """
schemas = {}

for filename in os.listdir(get_abs_path('schemas')):
path = get_abs_path('schemas') + '/' + filename
file_raw = filename.replace('.json', '')
with open(path) as file:
schemas[file_raw] = json.load(file)

schemas[file_raw] = Schema.from_dict(json.load(file))
return schemas


def discover():
raw_schemas = load_schemas()
streams = []

for schema_name, schema in raw_schemas.items():

for stream_id, schema in raw_schemas.items():
# TODO: populate any metadata and stream's key properties here..
stream_metadata = []
stream_key_properties = []

# create and add catalog entry
catalog_entry = {
'stream': schema_name,
'tap_stream_id': schema_name,
'schema': schema,
'metadata' : [],
'key_properties': []
}
streams.append(catalog_entry)

return {'streams': streams}

def get_selected_streams(catalog):
'''
Gets selected streams. Checks schema's 'selected' first (legacy)
and then checks metadata (current), looking for an empty breadcrumb
and mdata with a 'selected' entry
'''
selected_streams = []
for stream in catalog.streams:
stream_metadata = metadata.to_map(stream.metadata)
# stream metadata will have an empty breadcrumb
if metadata.get(stream_metadata, (), "selected"):
selected_streams.append(stream.tap_stream_id)

return selected_streams
key_properties = []
streams.append(
CatalogEntry(
tap_stream_id=stream_id,
stream=stream_id,
schema=schema,
key_properties=key_properties,
metadata=stream_metadata,
replication_key=None,
is_view=None,
database=None,
table=None,
row_count=None,
stream_alias=None,
replication_method=None,
)
)
return Catalog(streams)

def sync(config, state, catalog):

selected_stream_ids = get_selected_streams(catalog)

# Loop over streams in catalog
for stream in catalog.streams:
stream_id = stream.tap_stream_id
stream_schema = stream.schema
if stream_id in selected_stream_ids:
# TODO: sync code for stream goes here...
LOGGER.info('Syncing stream:' + stream_id)
def sync(config, state, catalog):
""" Sync data from tap source """
# Loop over selected streams in catalog
for stream in catalog.get_selected_streams(state):
LOGGER.info("Syncing stream:" + stream.tap_stream_id)

bookmark_column = stream.replication_key
is_sorted = True # TODO: indicate whether data is sorted ascending on bookmark value

singer.write_schema(
stream_name=stream.tap_stream_id,
schema=stream.schema,
key_properties=stream.key_properties,
)

# TODO: delete and replace this inline function with your own data retrieval process:
tap_data = lambda: [{"id": x, "name": "row${x}"} for x in range(1000)]

max_bookmark = None
for row in tap_data():
# TODO: place type conversions or transformations here

# write one or more rows to the stream:
singer.write_records(stream.tap_stream_id, [row])
if bookmark_column:
if is_sorted:
# update bookmark to latest value
singer.write_state({stream.tap_stream_id: row[bookmark_column]})
else:
# if data unsorted, save max value until end of writes
max_bookmark = max(max_bookmark, row[bookmark_column])
if bookmark_column and not is_sorted:
singer.write_state({stream.tap_stream_id: max_bookmark})
return


@utils.handle_top_exception(LOGGER)
def main():

# Parse command line arguments
args = utils.parse_args(REQUIRED_CONFIG_KEYS)

# If discover flag was passed, run discovery mode and dump output to stdout
if args.discover:
catalog = discover()
print(json.dumps(catalog, indent=2))
catalog.dump()
# Otherwise run in sync mode
else:
if args.catalog:
catalog = args.catalog
else:
catalog = discover()

catalog = discover()
sync(args.config, args.state, catalog)


if __name__ == "__main__":
main()

0 comments on commit 4375130

Please sign in to comment.