From 59ebcdbef43a342ef3e7c1590a78a06c705356d5 Mon Sep 17 00:00:00 2001 From: AJ Steers Date: Wed, 26 Aug 2020 18:47:11 -0700 Subject: [PATCH 01/14] ignore mypy cache --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 8376e3a..6bb765c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ # Byte-compiled / optimized / DLL files +.mypy_cache __pycache__/ *.py[cod] *$py.class From 8b1d3e39c0e746bc949db0233a5aafcad4915925 Mon Sep 17 00:00:00 2001 From: AJ Steers Date: Wed, 26 Aug 2020 18:47:33 -0700 Subject: [PATCH 02/14] prevent auto-format in vs code --- .vscode/settings.json | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..11e5bd3 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,7 @@ +{ + "[ptyhon]": { + // Prevent auto-format until `black` auto-formatter is adopted: + "editor.formatOnSave": false + }, + "editor.formatOnSave": false, +} \ No newline at end of file From 1bcbb336c227d39508bc4a61c84f96908d17d7c1 Mon Sep 17 00:00:00 2001 From: AJ Steers Date: Wed, 26 Aug 2020 18:59:09 -0700 Subject: [PATCH 03/14] add schema json --- .../schemas/exchange_rate.json | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 tap_exchangeratesapi/schemas/exchange_rate.json diff --git a/tap_exchangeratesapi/schemas/exchange_rate.json b/tap_exchangeratesapi/schemas/exchange_rate.json new file mode 100644 index 0000000..c38e9af --- /dev/null +++ b/tap_exchangeratesapi/schemas/exchange_rate.json @@ -0,0 +1,56 @@ +{ + "stream": "exchange_rate", + "tap_stream_id": "exchange_rate", + "schema": { + "type": "object", + "properties": { + "date": { "type": "string", "format": "date-time" }, + "CAD": { "type": ["null", "number"] }, + "HKD": { "type": ["null", "number"] }, + "ISK": { "type": ["null", "number"] }, + "PHP": { "type": ["null", "number"] }, + "DKK": { "type": ["null", "number"] }, + "HUF": { "type": ["null", "number"] }, + "CZK": { "type": ["null", "number"] }, + "GBP": { "type": ["null", "number"] }, + "RON": { "type": ["null", "number"] }, + "SEK": { "type": ["null", "number"] }, + "IDR": { "type": ["null", "number"] }, + "INR": { "type": ["null", "number"] }, + "BRL": { "type": ["null", "number"] }, + "RUB": { "type": ["null", "number"] }, + "HRK": { "type": ["null", "number"] }, + "JPY": { "type": ["null", "number"] }, + "THB": { "type": ["null", "number"] }, + "CHF": { "type": ["null", "number"] }, + "EUR": { "type": ["null", "number"] }, + "MYR": { "type": ["null", "number"] }, + "BGN": { "type": ["null", "number"] }, + "TRY": { "type": ["null", "number"] }, + "CNY": { "type": ["null", "number"] }, + "NOK": { "type": ["null", "number"] }, + "NZD": { "type": ["null", "number"] }, + "ZAR": { "type": ["null", "number"] }, + "USD": { "type": ["null", "number"] }, + "MXN": { "type": ["null", "number"] }, + "SGD": { "type": ["null", "number"] }, + "AUD": { "type": ["null", "number"] }, + "ILS": { "type": ["null", "number"] }, + "KRW": { "type": ["null", "number"] }, + "PLN": { "type": ["null", "number"] } + } + }, + "metadata": [ + { + "metadata": { + "table-key-properties": ["date"], + "valid-replication-keys": ["date"], + "is-view": false, + "selected": true, + "replication-method": "INCREMENTAL" + }, + "breadcrumb": [] + } + ], + "key_properties": ["date"] +} From 8ce2a0a00f9a901ab18a0b2a8f4e4d2ef46d1428 Mon Sep 17 00:00:00 2001 From: AJ Steers Date: Wed, 26 Aug 2020 18:59:20 -0700 Subject: [PATCH 04/14] add discover method --- tap_exchangeratesapi/__init__.py | 24 +++++++++++--- tap_exchangeratesapi/discovery.py | 55 +++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 5 deletions(-) create mode 100644 tap_exchangeratesapi/discovery.py diff --git a/tap_exchangeratesapi/__init__.py b/tap_exchangeratesapi/__init__.py index 875e647..3ada4c9 100644 --- a/tap_exchangeratesapi/__init__.py +++ b/tap_exchangeratesapi/__init__.py @@ -5,18 +5,23 @@ import argparse import time import requests -import singer import backoff import copy -from datetime import date, datetime, timedelta +from datetime import datetime, timedelta +from typing import List + +import singer +from .discovery import discover + base_url = 'https://api.exchangeratesapi.io/' logger = singer.get_logger() session = requests.Session() -DATE_FORMAT='%Y-%m-%d' +DATE_FORMAT: str = '%Y-%m-%d' +REQUIRED_CONFIG_KEYS: List[str] = [] # All config keys are optional def parse_response(r): flattened = r['rates'] @@ -45,12 +50,12 @@ def request(url, params): response = requests.get(url=url, params=params) response.raise_for_status() return response - + def do_sync(base, start_date): state = {'start_date': start_date} next_date = start_date prev_schema = {} - + try: while datetime.strptime(next_date, DATE_FORMAT) <= datetime.utcnow(): logger.info('Replicating exchange rate data from %s using base %s', @@ -90,6 +95,8 @@ def do_sync(base, start_date): def main(): parser = argparse.ArgumentParser() + parser.add_argument( + '-d', '--discover', help='Run discovery', required=False, action='store_true') parser.add_argument( '-c', '--config', help='Config file', required=False) parser.add_argument( @@ -97,6 +104,13 @@ def main(): args = parser.parse_args() + # If discover flag was passed, run discovery mode and dump output to stdout + if args.discover: + catalog = discover() + singer.catalog.write_catalog(catalog) + return + + # Otherwise run in sync mode if args.config: with open(args.config) as file: config = json.load(file) diff --git a/tap_exchangeratesapi/discovery.py b/tap_exchangeratesapi/discovery.py new file mode 100644 index 0000000..974bc6a --- /dev/null +++ b/tap_exchangeratesapi/discovery.py @@ -0,0 +1,55 @@ +"""Discovery functions for the Singer.io tap""" + +import json +import os + +import singer +from singer import Catalog, metadata + +from typing import Dict, List, Any + +LOGGER = singer.get_logger() +REPLICATION_METHOD = "INCREMENTAL" + + +def _get_abs_path(path): + return os.path.join(os.path.dirname(os.path.realpath(__file__)), path) + + +# Load schemas from schemas folder +def _load_schemas(): + schemas = {} + for filename in os.listdir(_get_abs_path("schemas")): + path = _get_abs_path("schemas") + "/" + filename + basename = filename.replace(".json", "") + with open(path) as file: + schemas[basename] = json.load(file) + return schemas + + +def discover() -> Dict[str, Dict[str, Any]]: + """ + Run discovery + + Returns + ------- + Dict[str, Dict[str, Any]] + The list of streams with their associated metadata + """ + LOGGER.info("Starting discovery mode") + streams = [] + for stream_name, schema in _load_schemas().items(): + catalog_entry = { + "stream": stream_name, + "tap_stream_id": stream_name, + "schema": schema, + "metadata": metadata.get_standard_metadata( + schema=schema["schema"], + key_properties=["date"], + valid_replication_keys=["date"], + replication_method=REPLICATION_METHOD, + ), + "key_properties": ["date"], + } + streams.append(catalog_entry) + return Catalog.from_dict({"streams": streams}) From 9f869025457eef9067072da77d4dc896729dda98 Mon Sep 17 00:00:00 2001 From: AJ Steers Date: Wed, 26 Aug 2020 20:13:55 -0700 Subject: [PATCH 05/14] hide test csv outputs --- .gitignore | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.gitignore b/.gitignore index 6bb765c..5069ba4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,7 @@ +# CSV output files + +exchange_rate-*.csv + # Byte-compiled / optimized / DLL files .mypy_cache __pycache__/ From 7e72d58bc0fbd02321ddf8892f72aa222196f796 Mon Sep 17 00:00:00 2001 From: AJ Steers Date: Wed, 26 Aug 2020 20:14:31 -0700 Subject: [PATCH 06/14] add support for optional catalog --- tap_exchangeratesapi/__init__.py | 50 ++++++++++++++++++++++++++++---- 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/tap_exchangeratesapi/__init__.py b/tap_exchangeratesapi/__init__.py index 3ada4c9..8a79084 100644 --- a/tap_exchangeratesapi/__init__.py +++ b/tap_exchangeratesapi/__init__.py @@ -9,7 +9,7 @@ import copy from datetime import datetime, timedelta -from typing import List +from typing import List, Dict, Any import singer from .discovery import discover @@ -51,10 +51,10 @@ def request(url, params): response.raise_for_status() return response -def do_sync(base, start_date): +def do_sync(base, start_date, catalog_override: Dict[str, Any] = None): state = {'start_date': start_date} next_date = start_date - prev_schema = {} + prev_schema: Dict[str, Any] = {} try: while datetime.strptime(next_date, DATE_FORMAT) <= datetime.utcnow(): @@ -75,7 +75,38 @@ def do_sync(base, start_date): singer.write_schema('exchange_rate', schema, 'date') if payload['date'] == next_date: - singer.write_records('exchange_rate', [parse_response(payload)]) + if catalog_override: + catalog_stream_override = [ + x for x in catalog_override["streams"] + if x["tap_stream_id"] == "exchange_rate" + ] + if not catalog_stream_override: + raise ValueError( + "Stream 'exchange_rate' not found in " + f"json: {catalog_override}" + ) + # else: + # logger.info(catalog_stream_override) + metadata_override = catalog_stream_override[0]["metadata"] + if not metadata_override: + raise ValueError( + "Metadata not found in " + f"json: {catalog_override}" + ) + # else: + # logger.info(metadata_override) + logger.info("Replicating with provided catalog file override") + for record in [parse_response(payload)]: + singer.write_record( + 'exchange_rate', + singer.Transformer().transform( + data=record, + schema=catalog_stream_override[0]["schema"], + # metadata=metadata_override, # TODO: debug (not working) + ) + ) + else: + singer.write_records('exchange_rate', [parse_response(payload)]) state = {'start_date': next_date} next_date = (datetime.strptime(next_date, DATE_FORMAT) + timedelta(days=1)).strftime(DATE_FORMAT) @@ -96,11 +127,13 @@ def main(): parser = argparse.ArgumentParser() parser.add_argument( - '-d', '--discover', help='Run discovery', required=False, action='store_true') + '-d', '--discover', help='Do schema discovery', required=False, action='store_true') parser.add_argument( '-c', '--config', help='Config file', required=False) parser.add_argument( '-s', '--state', help='State file', required=False) + parser.add_argument( + '--catalog', help='Catalog file', required=False) args = parser.parse_args() @@ -110,6 +143,11 @@ def main(): singer.catalog.write_catalog(catalog) return + catalog_override: Dict = None + if args.catalog: + with open(args.catalog) as file: + catalog_override = json.load(file) + # Otherwise run in sync mode if args.config: with open(args.config) as file: @@ -126,7 +164,7 @@ def main(): start_date = state.get('start_date') or config.get('start_date') or datetime.utcnow().strftime(DATE_FORMAT) start_date = singer.utils.strptime_with_tz(start_date).date().strftime(DATE_FORMAT) - do_sync(config.get('base', 'USD'), start_date) + do_sync(config.get('base', 'USD'), start_date, catalog_override=catalog_override) if __name__ == '__main__': From 80135dd870fd69accade15a6caa1cef131af9b48 Mon Sep 17 00:00:00 2001 From: AJ Steers Date: Wed, 26 Aug 2020 23:01:56 -0700 Subject: [PATCH 07/14] include schemas --- MANIFEST.in | 2 ++ setup.py | 1 + 2 files changed, 3 insertions(+) create mode 100644 MANIFEST.in diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..2de85c4 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,2 @@ +include LICENSE +include tap_exchangeratesapi/schemas/*.json \ No newline at end of file diff --git a/setup.py b/setup.py index 900005c..f7908ff 100644 --- a/setup.py +++ b/setup.py @@ -22,5 +22,6 @@ tap-exchangeratesapi=tap_exchangeratesapi:main ''', packages=['tap_exchangeratesapi'], + package_data={"schemas": ["tap_exchangeratesapi/schemas/*.json"]}, include_package_data=True ) From 2d4e4bf629e6781c4d1c1ebf8cf95d99ddf2e750 Mon Sep 17 00:00:00 2001 From: AJ Steers Date: Wed, 26 Aug 2020 23:08:52 -0700 Subject: [PATCH 08/14] bump versions --- setup.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index f7908ff..b268865 100644 --- a/setup.py +++ b/setup.py @@ -9,9 +9,9 @@ url='http://github.com/singer-io/tap-exchangeratesapi', classifiers=['Programming Language :: Python :: 3 :: Only'], py_modules=['tap_exchangeratesapi'], - install_requires=['singer-python==5.3.3', - 'backoff==1.3.2', - 'requests==2.21.0'], + install_requires=['singer-python==5.8.0', + 'backoff==1.8.0', + 'requests==2.22.0'], extras_require={ 'dev': [ 'ipdb==0.11' From bc3f3b539ca83c6f636604d01e7d64b4762d7095 Mon Sep 17 00:00:00 2001 From: AJ Steers Date: Wed, 26 Aug 2020 23:09:23 -0700 Subject: [PATCH 09/14] bump version to 0.2.0 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index b268865..dab2172 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup, find_packages setup(name='tap-exchangeratesapi', - version='0.1.1', + version='0.2.0', description='Singer.io tap for extracting currency exchange rate data from the exchangeratesapi.io API', author='Stitch', url='http://github.com/singer-io/tap-exchangeratesapi', From e1d212be2af80b8334743a80a8fa0d37ff01f9b1 Mon Sep 17 00:00:00 2001 From: AJ Steers Date: Thu, 27 Aug 2020 00:21:05 -0700 Subject: [PATCH 10/14] fix missing schema columns --- tap_exchangeratesapi/discovery.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tap_exchangeratesapi/discovery.py b/tap_exchangeratesapi/discovery.py index 974bc6a..b53ec1d 100644 --- a/tap_exchangeratesapi/discovery.py +++ b/tap_exchangeratesapi/discovery.py @@ -17,7 +17,7 @@ def _get_abs_path(path): # Load schemas from schemas folder -def _load_schemas(): +def _load_stream_schemas(): schemas = {} for filename in os.listdir(_get_abs_path("schemas")): path = _get_abs_path("schemas") + "/" + filename @@ -38,13 +38,13 @@ def discover() -> Dict[str, Dict[str, Any]]: """ LOGGER.info("Starting discovery mode") streams = [] - for stream_name, schema in _load_schemas().items(): + for stream_name, stream in _load_stream_schemas().items(): catalog_entry = { "stream": stream_name, "tap_stream_id": stream_name, - "schema": schema, + "schema": stream["schema"], "metadata": metadata.get_standard_metadata( - schema=schema["schema"], + schema=stream["schema"], key_properties=["date"], valid_replication_keys=["date"], replication_method=REPLICATION_METHOD, From 717c2b7388d6dfa240c44e42579065e652e7e9c8 Mon Sep 17 00:00:00 2001 From: AJ Steers Date: Thu, 27 Aug 2020 09:57:55 -0700 Subject: [PATCH 11/14] all cli inputs optional --- tap_exchangeratesapi/__init__.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tap_exchangeratesapi/__init__.py b/tap_exchangeratesapi/__init__.py index 8a79084..8243800 100644 --- a/tap_exchangeratesapi/__init__.py +++ b/tap_exchangeratesapi/__init__.py @@ -127,13 +127,13 @@ def main(): parser = argparse.ArgumentParser() parser.add_argument( - '-d', '--discover', help='Do schema discovery', required=False, action='store_true') + '-d', '--discover', help='Do schema discovery (optional).', required=False, action='store_true') parser.add_argument( - '-c', '--config', help='Config file', required=False) + '-c', '--config', help='Optional config file.', required=False) parser.add_argument( - '-s', '--state', help='State file', required=False) + '-s', '--state', help='Optional state file.', required=False) parser.add_argument( - '--catalog', help='Catalog file', required=False) + '--catalog', help='Optional catalog file.', required=False) args = parser.parse_args() From b3ffce7722ca85548a29898a48fb163e9351b68d Mon Sep 17 00:00:00 2001 From: AJ Steers Date: Thu, 27 Aug 2020 10:01:23 -0700 Subject: [PATCH 12/14] hide .vscode files --- .gitignore | 3 +++ .vscode/settings.json | 7 ------- 2 files changed, 3 insertions(+), 7 deletions(-) delete mode 100644 .vscode/settings.json diff --git a/.gitignore b/.gitignore index 5069ba4..13ba32e 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,9 @@ __pycache__/ *.py[cod] *$py.class +# IDE settings: +.vscode + # C extensions *.so diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 11e5bd3..0000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "[ptyhon]": { - // Prevent auto-format until `black` auto-formatter is adopted: - "editor.formatOnSave": false - }, - "editor.formatOnSave": false, -} \ No newline at end of file From e19143c5dc71aefe296a17691f3ff3c58a49b4ee Mon Sep 17 00:00:00 2001 From: AJ Steers Date: Thu, 27 Aug 2020 10:13:48 -0700 Subject: [PATCH 13/14] hide debug output --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 13ba32e..609b889 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,7 @@ __pycache__/ *.so # Distribution / packaging +.output .Python env/ build/ From bf1fbddc9f0b3d7c01f7e3522faac1228dc0b805 Mon Sep 17 00:00:00 2001 From: AJ Steers Date: Thu, 27 Aug 2020 10:14:08 -0700 Subject: [PATCH 14/14] resolve metadata failure --- tap_exchangeratesapi/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_exchangeratesapi/__init__.py b/tap_exchangeratesapi/__init__.py index 8243800..fa5c770 100644 --- a/tap_exchangeratesapi/__init__.py +++ b/tap_exchangeratesapi/__init__.py @@ -102,7 +102,7 @@ def do_sync(base, start_date, catalog_override: Dict[str, Any] = None): singer.Transformer().transform( data=record, schema=catalog_stream_override[0]["schema"], - # metadata=metadata_override, # TODO: debug (not working) + metadata=catalog_stream_override[0], ) ) else: