Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional --discover capability #8

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
# CSV output files

exchange_rate-*.csv

# Byte-compiled / optimized / DLL files
.mypy_cache
__pycache__/
*.py[cod]
*$py.class

# IDE settings:
.vscode

# C extensions
*.so

# Distribution / packaging
.output
.Python
env/
build/
Expand Down
2 changes: 2 additions & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
include LICENSE
include tap_exchangeratesapi/schemas/*.json
9 changes: 5 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
from setuptools import setup, find_packages

setup(name='tap-exchangeratesapi',
version='0.1.1',
version='0.2.0',
description='Singer.io tap for extracting currency exchange rate data from the exchangeratesapi.io API',
author='Stitch',
url='http://github.com/singer-io/tap-exchangeratesapi',
classifiers=['Programming Language :: Python :: 3 :: Only'],
py_modules=['tap_exchangeratesapi'],
install_requires=['singer-python==5.3.3',
'backoff==1.3.2',
'requests==2.21.0'],
install_requires=['singer-python==5.8.0',
'backoff==1.8.0',
'requests==2.22.0'],
extras_require={
'dev': [
'ipdb==0.11'
Expand All @@ -22,5 +22,6 @@
tap-exchangeratesapi=tap_exchangeratesapi:main
''',
packages=['tap_exchangeratesapi'],
package_data={"schemas": ["tap_exchangeratesapi/schemas/*.json"]},
include_package_data=True
)
74 changes: 63 additions & 11 deletions tap_exchangeratesapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,23 @@
import argparse
import time
import requests
import singer
import backoff
import copy

from datetime import date, datetime, timedelta
from datetime import datetime, timedelta
from typing import List, Dict, Any

import singer
from .discovery import discover


base_url = 'https://api.exchangeratesapi.io/'

logger = singer.get_logger()
session = requests.Session()

DATE_FORMAT='%Y-%m-%d'
DATE_FORMAT: str = '%Y-%m-%d'
REQUIRED_CONFIG_KEYS: List[str] = [] # All config keys are optional
dmosorast marked this conversation as resolved.
Show resolved Hide resolved

def parse_response(r):
flattened = r['rates']
Expand Down Expand Up @@ -45,12 +50,12 @@ def request(url, params):
response = requests.get(url=url, params=params)
response.raise_for_status()
return response
def do_sync(base, start_date):

def do_sync(base, start_date, catalog_override: Dict[str, Any] = None):
state = {'start_date': start_date}
next_date = start_date
prev_schema = {}
prev_schema: Dict[str, Any] = {}

try:
while datetime.strptime(next_date, DATE_FORMAT) <= datetime.utcnow():
logger.info('Replicating exchange rate data from %s using base %s',
Expand All @@ -70,7 +75,38 @@ def do_sync(base, start_date):
singer.write_schema('exchange_rate', schema, 'date')

if payload['date'] == next_date:
singer.write_records('exchange_rate', [parse_response(payload)])
if catalog_override:
catalog_stream_override = [
x for x in catalog_override["streams"]
if x["tap_stream_id"] == "exchange_rate"
]
if not catalog_stream_override:
raise ValueError(
"Stream 'exchange_rate' not found in "
f"json: {catalog_override}"
)
# else:
# logger.info(catalog_stream_override)
Comment on lines +88 to +89
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Todo: remove these after testing.

metadata_override = catalog_stream_override[0]["metadata"]
if not metadata_override:
raise ValueError(
"Metadata not found in "
f"json: {catalog_override}"
)
# else:
# logger.info(metadata_override)
Comment on lines +96 to +97
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. Remove after testing.

logger.info("Replicating with provided catalog file override")
for record in [parse_response(payload)]:
singer.write_record(
'exchange_rate',
singer.Transformer().transform(
data=record,
schema=catalog_stream_override[0]["schema"],
metadata=catalog_stream_override[0],
)
)
else:
singer.write_records('exchange_rate', [parse_response(payload)])

state = {'start_date': next_date}
next_date = (datetime.strptime(next_date, DATE_FORMAT) + timedelta(days=1)).strftime(DATE_FORMAT)
Expand All @@ -91,12 +127,28 @@ def main():
parser = argparse.ArgumentParser()

parser.add_argument(
'-c', '--config', help='Config file', required=False)
'-d', '--discover', help='Do schema discovery (optional).', required=False, action='store_true')
parser.add_argument(
'-c', '--config', help='Optional config file.', required=False)
parser.add_argument(
'-s', '--state', help='Optional state file.', required=False)
parser.add_argument(
'-s', '--state', help='State file', required=False)
'--catalog', help='Optional catalog file.', required=False)

args = parser.parse_args()

# If discover flag was passed, run discovery mode and dump output to stdout
if args.discover:
catalog = discover()
singer.catalog.write_catalog(catalog)
return

catalog_override: Dict = None
if args.catalog:
with open(args.catalog) as file:
catalog_override = json.load(file)

# Otherwise run in sync mode
if args.config:
with open(args.config) as file:
config = json.load(file)
Expand All @@ -112,7 +164,7 @@ def main():
start_date = state.get('start_date') or config.get('start_date') or datetime.utcnow().strftime(DATE_FORMAT)
start_date = singer.utils.strptime_with_tz(start_date).date().strftime(DATE_FORMAT)

do_sync(config.get('base', 'USD'), start_date)
do_sync(config.get('base', 'USD'), start_date, catalog_override=catalog_override)


if __name__ == '__main__':
Expand Down
55 changes: 55 additions & 0 deletions tap_exchangeratesapi/discovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""Discovery functions for the Singer.io tap"""

import json
import os

import singer
from singer import Catalog, metadata

from typing import Dict, List, Any

LOGGER = singer.get_logger()
REPLICATION_METHOD = "INCREMENTAL"


def _get_abs_path(path):
return os.path.join(os.path.dirname(os.path.realpath(__file__)), path)


# Load schemas from schemas folder
def _load_stream_schemas():
schemas = {}
for filename in os.listdir(_get_abs_path("schemas")):
path = _get_abs_path("schemas") + "/" + filename
basename = filename.replace(".json", "")
with open(path) as file:
schemas[basename] = json.load(file)
return schemas


def discover() -> Dict[str, Dict[str, Any]]:
"""
Run discovery

Returns
-------
Dict[str, Dict[str, Any]]
The list of streams with their associated metadata
"""
LOGGER.info("Starting discovery mode")
streams = []
for stream_name, stream in _load_stream_schemas().items():
catalog_entry = {
"stream": stream_name,
"tap_stream_id": stream_name,
"schema": stream["schema"],
"metadata": metadata.get_standard_metadata(
schema=stream["schema"],
key_properties=["date"],
valid_replication_keys=["date"],
replication_method=REPLICATION_METHOD,
),
"key_properties": ["date"],
}
streams.append(catalog_entry)
return Catalog.from_dict({"streams": streams})
56 changes: 56 additions & 0 deletions tap_exchangeratesapi/schemas/exchange_rate.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
{
"stream": "exchange_rate",
"tap_stream_id": "exchange_rate",
"schema": {
"type": "object",
"properties": {
"date": { "type": "string", "format": "date-time" },
"CAD": { "type": ["null", "number"] },
"HKD": { "type": ["null", "number"] },
"ISK": { "type": ["null", "number"] },
"PHP": { "type": ["null", "number"] },
"DKK": { "type": ["null", "number"] },
"HUF": { "type": ["null", "number"] },
"CZK": { "type": ["null", "number"] },
"GBP": { "type": ["null", "number"] },
"RON": { "type": ["null", "number"] },
"SEK": { "type": ["null", "number"] },
"IDR": { "type": ["null", "number"] },
"INR": { "type": ["null", "number"] },
"BRL": { "type": ["null", "number"] },
"RUB": { "type": ["null", "number"] },
"HRK": { "type": ["null", "number"] },
"JPY": { "type": ["null", "number"] },
"THB": { "type": ["null", "number"] },
"CHF": { "type": ["null", "number"] },
"EUR": { "type": ["null", "number"] },
"MYR": { "type": ["null", "number"] },
"BGN": { "type": ["null", "number"] },
"TRY": { "type": ["null", "number"] },
"CNY": { "type": ["null", "number"] },
"NOK": { "type": ["null", "number"] },
"NZD": { "type": ["null", "number"] },
"ZAR": { "type": ["null", "number"] },
"USD": { "type": ["null", "number"] },
"MXN": { "type": ["null", "number"] },
"SGD": { "type": ["null", "number"] },
"AUD": { "type": ["null", "number"] },
"ILS": { "type": ["null", "number"] },
"KRW": { "type": ["null", "number"] },
"PLN": { "type": ["null", "number"] }
}
},
"metadata": [
{
"metadata": {
"table-key-properties": ["date"],
"valid-replication-keys": ["date"],
"is-view": false,
"selected": true,
"replication-method": "INCREMENTAL"
},
"breadcrumb": []
}
],
"key_properties": ["date"]
}