forked from openspending/dpkg-uk25k
-
Notifications
You must be signed in to change notification settings - Fork 0
/
suppliers.py
98 lines (78 loc) · 3.05 KB
/
suppliers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from itertools import chain
import sqlaload as sl
import logging
import sys
import time
import json
import urllib
import urllib2
import multiprocessing
from contextlib import closing
from common import *
def connect():
engine = db_connect()
table = sl.get_table(engine, 'condensed')
return engine,table
rows_count = 0
tables_count = 0
suppliers_visited = {}
def supplier_names_from_table(engine, resource_id, table_id):
global tables_count, rows_count
tables_count = tables_count + 1
print "# checking table %d" % tables_count
table_suffix = '%s_table%s' % (resource_id, table_id)
table = sl.get_table(engine, 'spending_%s' % table_suffix)
supplier_table = sl.get_table(engine, 'suppliers')
for row in sl.all(engine, table):
rows_count = rows_count + 1
if not row.has_key('SupplierName'):
# One of the junk tables that contain no real data, usually excel noise
continue
supplier = row['SupplierName']
if supplier is None or supplier == '':
continue
if suppliers_visited.has_key(supplier):
continue
suppliers_visited[supplier] = True
if sl.find_one(engine, supplier_table, original=supplier) is not None:
continue
yield supplier
def supplier_tables(engine, table):
for row in sl.all(engine, table):
yield supplier_names_from_table(engine, row['resource_id'], row['table_id'])
def supplier_names(engine, table):
return chain.from_iterable(supplier_tables(engine, table))
def lookup_supplier_name(name):
try:
query = {'query': name, 'limit': 1}
url = "http://opencorporates.com/reconcile?%s" % urllib.urlencode({'query': json.dumps(query)})
with closing(urllib2.urlopen(url), None, 30) as f:
data = json.loads(f.read())
if len(data['result']) > 0:
return {'original': name,
'result': data['result'][0],
}
except:
return None
if __name__ == '__main__':
logging.basicConfig()
logging.getLogger(__name__).setLevel(logging.DEBUG)
logging.getLogger('sqlalchemy.engine').setLevel(logging.WARN)
engine, table = connect()
supplier_table = sl.get_table(engine, 'suppliers')
pool = multiprocessing.Pool(40)
fails = 0
for r in pool.imap_unordered(lookup_supplier_name, supplier_names(engine, table)):
if r is not None:
print "%s ==> %s" % (r['original'], r['result']['name'])
sl.upsert(engine, supplier_table, {'original': r['original'],
'name': r['result']['name'],
'uri': r['result']['uri'],
'score': r['result']['score'],
},
['original'])
print "# %d rows and %d tables visited" % (rows_count, tables_count)
else:
fails = fails + 1
if fails % 100 == 0:
print "# %d requests failed" % fails