-
Notifications
You must be signed in to change notification settings - Fork 1
/
app.py
executable file
·140 lines (117 loc) · 3.85 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""[application description here]"""
import re
import io
import csv
import sqlite3
import itertools
from pathlib import Path
from shutil import copyfile
from bottle import response, route, run
from bottle import jinja2_template as template
# Custom tools
import utils
__appname__ = "AmoebaServer"
__author__ = "Antoni Kaniowski"
__version__ = "0.1"
__license__ = "wtfpl"
# Store number of rows just to handle properly
no_cols = None
# Check if file exists, if not, bootstrap it from the sample
db_file = Path("db.sqlite")
if not db_file.is_file():
copyfile("db.sqlite.sample", "db.sqlite")
db = sqlite3.connect("db.sqlite")
# Renders the table
@route('/')
def print_items():
"""Print the data in the database in a tabular form"""
cursor = db.execute('SELECT * FROM t1')
# FIXME handle no database case with grace
# FIXME Set number of rows to be x
return template('table.html',
headings=list(map(lambda x: x[0], cursor.description)),
items=cursor.fetchall())
# Sends the sqlite file
@route('/download')
def get_sqlite():
"""Get the sqlite file and download it"""
response.headers['Content-Disposition'] = \
'attachment; filename="database.sqlite"'
buffer = io.StringIO()
for line in db.iterdump():
buffer.write('%s\n' % line)
return buffer.getvalue()
# Sends the csv of the db
@route('/csv')
def get_csv():
"""Get the csv file from the database"""
cursor = db.execute('SELECT * FROM t1')
header = list(map(lambda x: x[0], cursor.description))
csvdata = cursor.fetchall()
output = io.StringIO()
writer = csv.writer(output, quoting=csv.QUOTE_MINIMAL)
writer.writerow(header)
for line in csvdata:
writer.writerow(line)
# --
return template('<pre>{{o}}</pre>', o=output.getvalue())
def get_one():
"""Get one row of the database"""
cursor = db.execute("SELECT * FROM t1 LIMIT 1")
return cursor.fetchall()
@route('/insert/<rows>')
def insert(rows):
"""Insert/Bulk insert values into the table.
Parameter
--------
rows : str
A long string equal to the number of columns in the database
setup. Each column value is separated by a comma and or by
delineating each row with a bracket.
"""
# TODO Try to handle special characters that are difficult
global no_cols
if no_cols is None:
no_cols = len(get_one()[0])
rd = csv.DictReader(io.StringIO(rows))
try:
# TODO Figure out what errors could occur
dta = [item.rstrip(")").lstrip(" (") for item in rd.fieldnames]
data = list(utils.grouper(no_cols, dta))
fields = ("?, " * no_cols).rstrip(", ")
command = "INSERT INTO t1 VALUES (%s)" % fields
db.executemany(command, data)
except:
raise
db.commit()
return "Successfully inserted %s" % rows
# Creates a new table
@route('/init/<rows>')
def init(rows):
"""Initialize a new table with 'n' columns
Parameter
---------
rows : string
The rows indicate the column names of the columns in the table.
Each of columns are separated by a comma to indicate that they
are to be takend as the name of the column in the table.
"""
global no_cols
reader_list = csv.DictReader(io.StringIO(rows))
header = reader_list.fieldnames
no_cols = len(header)
try:
h = " varchar, ".join(str(x) for x in header)
statement = 'CREATE TABLE t1 ({h} varchar)'.format(h=h)
print(statement)
db.execute('DROP TABLE IF EXISTS t1')
db.execute(statement)
db.commit()
except sqlite3.OperationalError:
# TODO Try to switch keywords with non-keywords
print("Check your init statement. Keywords are not allowed.")
raise
return "Ran: \r" + statement
run(host='0.0.0.0', port=8888)