Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional --discover capability #8

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
# CSV output files

exchange_rate-*.csv

# Byte-compiled / optimized / DLL files
.mypy_cache
__pycache__/
*.py[cod]
*$py.class
Expand Down
7 changes: 7 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"[ptyhon]": {
// Prevent auto-format until `black` auto-formatter is adopted:
"editor.formatOnSave": false
},
"editor.formatOnSave": false,
}
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
include LICENSE
include tap_exchangeratesapi/schemas/*.json
9 changes: 5 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
from setuptools import setup, find_packages

setup(name='tap-exchangeratesapi',
version='0.1.1',
version='0.2.0',
description='Singer.io tap for extracting currency exchange rate data from the exchangeratesapi.io API',
author='Stitch',
url='http://github.com/singer-io/tap-exchangeratesapi',
classifiers=['Programming Language :: Python :: 3 :: Only'],
py_modules=['tap_exchangeratesapi'],
install_requires=['singer-python==5.3.3',
'backoff==1.3.2',
'requests==2.21.0'],
install_requires=['singer-python==5.8.0',
'backoff==1.8.0',
'requests==2.22.0'],
extras_require={
'dev': [
'ipdb==0.11'
Expand All @@ -22,5 +22,6 @@
tap-exchangeratesapi=tap_exchangeratesapi:main
''',
packages=['tap_exchangeratesapi'],
package_data={"schemas": ["tap_exchangeratesapi/schemas/*.json"]},
include_package_data=True
)
70 changes: 61 additions & 9 deletions tap_exchangeratesapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,23 @@
import argparse
import time
import requests
import singer
import backoff
import copy

from datetime import date, datetime, timedelta
from datetime import datetime, timedelta
from typing import List, Dict, Any

import singer
from .discovery import discover


base_url = 'https://api.exchangeratesapi.io/'

logger = singer.get_logger()
session = requests.Session()

DATE_FORMAT='%Y-%m-%d'
DATE_FORMAT: str = '%Y-%m-%d'
REQUIRED_CONFIG_KEYS: List[str] = [] # All config keys are optional
dmosorast marked this conversation as resolved.
Show resolved Hide resolved

def parse_response(r):
flattened = r['rates']
Expand Down Expand Up @@ -45,12 +50,12 @@ def request(url, params):
response = requests.get(url=url, params=params)
response.raise_for_status()
return response
def do_sync(base, start_date):

def do_sync(base, start_date, catalog_override: Dict[str, Any] = None):
state = {'start_date': start_date}
next_date = start_date
prev_schema = {}
prev_schema: Dict[str, Any] = {}

try:
while datetime.strptime(next_date, DATE_FORMAT) <= datetime.utcnow():
logger.info('Replicating exchange rate data from %s using base %s',
Expand All @@ -70,7 +75,38 @@ def do_sync(base, start_date):
singer.write_schema('exchange_rate', schema, 'date')

if payload['date'] == next_date:
singer.write_records('exchange_rate', [parse_response(payload)])
if catalog_override:
catalog_stream_override = [
x for x in catalog_override["streams"]
if x["tap_stream_id"] == "exchange_rate"
]
if not catalog_stream_override:
raise ValueError(
"Stream 'exchange_rate' not found in "
f"json: {catalog_override}"
)
# else:
# logger.info(catalog_stream_override)
Comment on lines +88 to +89
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Todo: remove these after testing.

metadata_override = catalog_stream_override[0]["metadata"]
if not metadata_override:
raise ValueError(
"Metadata not found in "
f"json: {catalog_override}"
)
# else:
# logger.info(metadata_override)
Comment on lines +96 to +97
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. Remove after testing.

logger.info("Replicating with provided catalog file override")
for record in [parse_response(payload)]:
singer.write_record(
'exchange_rate',
singer.Transformer().transform(
data=record,
schema=catalog_stream_override[0]["schema"],
# metadata=metadata_override, # TODO: debug (not working)
Copy link
Author

@aaronsteers aaronsteers Aug 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get errors when I try to pass the metadata arg.

  • Any ideas?? ❓

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be that the metadata is in list format and it's expecting dict or vice-versa. My gut says it needs to be a dict, but haven't written this code in awhile.

Copy link
Author

@aaronsteers aaronsteers Aug 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dmosorast

I think you are correct. The below seems to resolve the error, now pulling in the full dict which is one level up from the "metadata" element (which is itself of type list).

                            singer.Transformer().transform(
                                data=record,
                                schema=catalog_stream_override[0]["schema"],
                                metadata=catalog_stream_override[0],
                            )

Note: It's still possible that I have this wrong and the failure just went away because I fixed the argument type.

  • More testing needed to confirm whether those metadata arguments (especially selected status) are correctly getting piped through.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More to follow (and more code cleanup to do) once I confirm the results are as expected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the behavior might be incorrect. Without 100% tracing the code, I would expect it to be something like metadata.to_map(metadata_override) using the metadata module in singer-python.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API for the metadata is a bit strange since it uses a module/"functional" pattern, so I can chat over DM if anything needs cleared up. I'm trying to figure out how to explain it myself.

)
)
else:
singer.write_records('exchange_rate', [parse_response(payload)])

state = {'start_date': next_date}
next_date = (datetime.strptime(next_date, DATE_FORMAT) + timedelta(days=1)).strftime(DATE_FORMAT)
Expand All @@ -90,13 +126,29 @@ def do_sync(base, start_date):
def main():
parser = argparse.ArgumentParser()

parser.add_argument(
'-d', '--discover', help='Do schema discovery', required=False, action='store_true')
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved
parser.add_argument(
'-c', '--config', help='Config file', required=False)
parser.add_argument(
'-s', '--state', help='State file', required=False)
parser.add_argument(
'--catalog', help='Catalog file', required=False)

args = parser.parse_args()

# If discover flag was passed, run discovery mode and dump output to stdout
if args.discover:
catalog = discover()
singer.catalog.write_catalog(catalog)
return

catalog_override: Dict = None
if args.catalog:
with open(args.catalog) as file:
catalog_override = json.load(file)

# Otherwise run in sync mode
if args.config:
with open(args.config) as file:
config = json.load(file)
Expand All @@ -112,7 +164,7 @@ def main():
start_date = state.get('start_date') or config.get('start_date') or datetime.utcnow().strftime(DATE_FORMAT)
start_date = singer.utils.strptime_with_tz(start_date).date().strftime(DATE_FORMAT)

do_sync(config.get('base', 'USD'), start_date)
do_sync(config.get('base', 'USD'), start_date, catalog_override=catalog_override)


if __name__ == '__main__':
Expand Down
55 changes: 55 additions & 0 deletions tap_exchangeratesapi/discovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""Discovery functions for the Singer.io tap"""

import json
import os

import singer
from singer import Catalog, metadata

from typing import Dict, List, Any

LOGGER = singer.get_logger()
REPLICATION_METHOD = "INCREMENTAL"


def _get_abs_path(path):
return os.path.join(os.path.dirname(os.path.realpath(__file__)), path)


# Load schemas from schemas folder
def _load_stream_schemas():
schemas = {}
for filename in os.listdir(_get_abs_path("schemas")):
path = _get_abs_path("schemas") + "/" + filename
basename = filename.replace(".json", "")
with open(path) as file:
schemas[basename] = json.load(file)
return schemas


def discover() -> Dict[str, Dict[str, Any]]:
"""
Run discovery

Returns
-------
Dict[str, Dict[str, Any]]
The list of streams with their associated metadata
"""
LOGGER.info("Starting discovery mode")
streams = []
for stream_name, stream in _load_stream_schemas().items():
catalog_entry = {
"stream": stream_name,
"tap_stream_id": stream_name,
"schema": stream["schema"],
"metadata": metadata.get_standard_metadata(
schema=stream["schema"],
key_properties=["date"],
valid_replication_keys=["date"],
replication_method=REPLICATION_METHOD,
),
"key_properties": ["date"],
}
streams.append(catalog_entry)
return Catalog.from_dict({"streams": streams})
56 changes: 56 additions & 0 deletions tap_exchangeratesapi/schemas/exchange_rate.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
{
"stream": "exchange_rate",
"tap_stream_id": "exchange_rate",
"schema": {
"type": "object",
"properties": {
"date": { "type": "string", "format": "date-time" },
"CAD": { "type": ["null", "number"] },
"HKD": { "type": ["null", "number"] },
"ISK": { "type": ["null", "number"] },
"PHP": { "type": ["null", "number"] },
"DKK": { "type": ["null", "number"] },
"HUF": { "type": ["null", "number"] },
"CZK": { "type": ["null", "number"] },
"GBP": { "type": ["null", "number"] },
"RON": { "type": ["null", "number"] },
"SEK": { "type": ["null", "number"] },
"IDR": { "type": ["null", "number"] },
"INR": { "type": ["null", "number"] },
"BRL": { "type": ["null", "number"] },
"RUB": { "type": ["null", "number"] },
"HRK": { "type": ["null", "number"] },
"JPY": { "type": ["null", "number"] },
"THB": { "type": ["null", "number"] },
"CHF": { "type": ["null", "number"] },
"EUR": { "type": ["null", "number"] },
"MYR": { "type": ["null", "number"] },
"BGN": { "type": ["null", "number"] },
"TRY": { "type": ["null", "number"] },
"CNY": { "type": ["null", "number"] },
"NOK": { "type": ["null", "number"] },
"NZD": { "type": ["null", "number"] },
"ZAR": { "type": ["null", "number"] },
"USD": { "type": ["null", "number"] },
"MXN": { "type": ["null", "number"] },
"SGD": { "type": ["null", "number"] },
"AUD": { "type": ["null", "number"] },
"ILS": { "type": ["null", "number"] },
"KRW": { "type": ["null", "number"] },
"PLN": { "type": ["null", "number"] }
}
},
"metadata": [
{
"metadata": {
"table-key-properties": ["date"],
"valid-replication-keys": ["date"],
"is-view": false,
"selected": true,
"replication-method": "INCREMENTAL"
},
"breadcrumb": []
}
],
"key_properties": ["date"]
}