-
Notifications
You must be signed in to change notification settings - Fork 13
/
data_access_token.py
201 lines (151 loc) · 6.41 KB
/
data_access_token.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
"""Functions for token management."""
__copyright__ = 'Copyright (c) 2021-2024, Utrecht University'
__license__ = 'GPLv3, see LICENSE'
import os
import secrets
from datetime import datetime, timedelta
from traceback import print_exc
from typing import List
from pysqlcipher3 import dbapi2 as sqlite3
from util import *
__all__ = ['api_token_generate',
'api_token_load',
'api_token_delete',
'api_token_delete_expired']
@api.make()
def api_token_generate(ctx: rule.Context, label: str = "") -> api.Result:
"""Generates a token for user authentication.
:param ctx: Combined type of a callback and rei struct
:param label: Optional label of the token
:returns: Generated token or API error
"""
def generate_token() -> str:
length = int(config.token_length)
token = secrets.token_urlsafe(length)
return token[:length]
if not token_database_initialized():
return api.Error('DatabaseError', 'Internal error: token database unavailable')
user_id = user.name(ctx)
token = generate_token()
gen_time = datetime.now()
token_lifetime = timedelta(hours=config.token_lifetime)
exp_time = gen_time + token_lifetime
conn = sqlite3.connect(config.token_database)
result = None
try:
with conn:
conn.execute("PRAGMA key='%s'" % (config.token_database_password))
conn.execute('''INSERT INTO tokens VALUES (?, ?, ?, ?, ?)''', (user_id, label, token, gen_time, exp_time))
result = token
except sqlite3.IntegrityError:
result = api.Error('TokenExistsError', 'Token with this label already exists')
except Exception:
print_exc()
result = api.Error('DatabaseError', 'Error occurred while writing to database')
# Connection object used as context manager only commits or rollbacks transactions,
# so the connection object should be closed manually
conn.close()
return result
@api.make()
def api_token_load(ctx: rule.Context) -> api.Result:
"""Loads valid tokens of user.
:param ctx: Combined type of a callback and rei struct
:returns: Valid tokens
"""
if not token_database_initialized():
return api.Error('DatabaseError', 'Internal error: token database unavailable')
user_id = user.name(ctx)
conn = sqlite3.connect(config.token_database)
result = []
try:
with conn:
conn.execute("PRAGMA key='%s'" % (config.token_database_password))
for row in conn.execute('''SELECT label, exp_time FROM tokens WHERE user=:user_id AND exp_time > :now''',
{"user_id": user_id, "now": datetime.now()}):
date_time = datetime.strptime(row[1], '%Y-%m-%d %H:%M:%S.%f')
exp_time = date_time.strftime('%Y-%m-%d %H:%M:%S')
result.append({"label": row[0], "exp_time": exp_time})
except Exception:
print_exc()
result = api.Error('DatabaseError', 'Error occurred while reading database')
# Connection object used as context manager only commits or rollbacks transactions,
# so the connection object should be closed manually
conn.close()
return result
@api.make()
def api_token_delete(ctx: rule.Context, label: str) -> api.Result:
"""Deletes a token of the user.
:param ctx: Combined type of a callback and rei struct
:param label: Label of the token
:returns: Status of token deletion
"""
if not token_database_initialized():
return api.Error('DatabaseError', 'Internal error: token database unavailable')
user_id = user.name(ctx)
conn = sqlite3.connect(config.token_database)
result = None
try:
with conn:
conn.execute("PRAGMA key='%s'" % (config.token_database_password))
conn.execute('''DELETE FROM tokens WHERE user = ? AND label = ?''', (user_id, label))
result = api.Result.ok()
except Exception:
print_exc()
result = api.Error('DatabaseError', 'Error during deletion from database')
# Connection object used as context manager only commits or rollbacks transactions,
# so the connection object should be closed manually
conn.close()
return result
@api.make()
def api_token_delete_expired(ctx: rule.Context) -> api.Result:
"""Deletes expired tokens of current user
:param ctx: Combined type of a callback and rei struct
:returns: Status of token deletion
"""
if not token_database_initialized():
return api.Error('DatabaseError', 'Internal error: token database unavailable')
user_id = user.name(ctx)
conn = sqlite3.connect(config.token_database)
result = None
try:
with conn:
conn.execute("PRAGMA key='%s'" % (config.token_database_password))
conn.execute('''DELETE FROM tokens WHERE user = ? AND exp_time < ? ''', (user_id, datetime.now()))
result = api.Result.ok()
except Exception:
print_exc()
result = api.Error('DatabaseError', 'Error during deletion from database')
# Connection object used as context manager only commits or rollbacks transactions,
# so the connection object should be closed manually
conn.close()
return result
def get_all_tokens(ctx: rule.Context) -> List:
"""Retrieve all valid tokens.
:param ctx: Combined type of a callback and rei struct
:returns: Valid tokens
"""
# check permissions - rodsadmin only
if user.user_type(ctx) != 'rodsadmin':
return []
if not token_database_initialized():
return []
conn = sqlite3.connect(config.token_database)
result = []
try:
with conn:
conn.execute("PRAGMA key='%s'" % (config.token_database_password))
for row in conn.execute('''SELECT user, label, exp_time FROM tokens WHERE exp_time > :now''',
{"now": datetime.now()}):
result.append({"user": row[0], "label": row[1], "exp_time": row[2]})
except Exception:
print_exc()
result = api.Error('DatabaseError', 'Error occurred while reading database')
# Connection object used as context manager only commits or rollbacks transactions,
# so the connection object should be closed manually
conn.close()
return result
def token_database_initialized() -> bool:
"""Checks whether token database has been initialized
:returns: Boolean value
"""
return os.path.isfile(config.token_database)