Skip to content

Commit

Permalink
Merge pull request #278 from mvexel/master
Browse files Browse the repository at this point in the history
Rework of stats store and endpoints
  • Loading branch information
mvexel committed Jul 14, 2014
2 parents 5a7e230 + 82a23c7 commit 3a520c0
Show file tree
Hide file tree
Showing 12 changed files with 468 additions and 148 deletions.
8 changes: 5 additions & 3 deletions fabfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,12 @@ def setup_cron(instance):
sudo('crontab /tmp/crondump', user='www-data')


def install_python_dependencies(instance):
def install_python_dependencies(instance, upgrade=False):
dirname = "/srv/www/%s" % instance
cmd = 'source %s/virtualenv/bin/activate && pip\
install -r %s/htdocs/maproulette/requirements.txt' % (dirname, dirname)
cmd = 'source {basepath}/virtualenv/bin/activate && pip\
install {upgrade}-r {basepath}/htdocs/maproulette/requirements.txt'.format(
basepath=dirname,
upgrade='--upgrade' if upgrade else '')
sudo(cmd, user="www-data")


Expand Down
27 changes: 18 additions & 9 deletions fabric_templates/config
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
# The application secret key

# http://www.thedutchtable.com/2013/03/een-ei-is-geen-eivrolijk-pasen.html
SECRET_KEY = 'een ei is geen ei twee ei is een half ei'
SECRET_KEY = 'REPLACE WITH YOUR APPLICATION SECRET KEY'

{% if setting == "dev" %}
{% set db = "maproulette_dev" %}
{% elif setting == "test" %}
{% set db = "maproulette_test" %}
{% elif setting == "prod" %}
{% set db = "maproulette_test" %}
{% endif %}
{% if setting == "dev" -%}
{% set db = "maproulette_dev" -%}
{% set cors_url = "http://maproulette-metrics-stage.s3-website-us-east-1.amazonaws.com/" -%}
{% elif setting == "test" -%}
{% set db = "maproulette_test" -%}
{% set cors_url = "http://maproulette-metrics-stage.s3-website-us-east-1.amazonaws.com/" -%}
{% elif setting == "prod" -%}
{% set db = "maproulette" -%}
{% set cors_url = "http://maproulette-metrics-production.s3-website-us-east-1.amazonaws.com/" -%}
{% endif -%}

# The database connection
SQLALCHEMY_DATABASE_URI = "postgresql://osm@localhost/{{db}}"
Expand Down Expand Up @@ -74,4 +77,10 @@ MAILGUN_API_KEY = 'REPLACE WITH YOUR MAILGUN API KEY'

# IP Whitelist for external API calls
# (/api/admin/*, /api/stats*, /api/users, /api/challenges)
IP_WHITELIST = [ ]
IP_WHITELIST = [ ]

# Max number of tasks in a bulk update call
MAX_TASKS_BULK_UPDATE = 5000

# URL to the metrics site instance, for allowing CORS requests from
METRICS_URL = '{{cors_url}}'
11 changes: 6 additions & 5 deletions manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,8 @@ def create_testdata(challenges=10, tasks=100, users=10):
for j in range(int(tasks)):
# generate a unique identifier
identifier = str(uuid.uuid4())
# instantiate the task and register it with challenge 'test'
# Initialize a task with its challenge slug and persistent ID
task = Task(challenge.slug, identifier)
# create two random points not too far apart
task_geometries = []
p1 = Point(
random.randrange(minx, maxx) + random.random(),
random.randrange(miny, maxy) + random.random())
Expand All @@ -123,10 +121,13 @@ def create_testdata(challenges=10, tasks=100, users=10):
# generate some random 'osm ids'
osmids = [random.randrange(1000000, 1000000000) for _ in range(2)]
# add the first point and the linestring to the task's geometries
task.geometries.append(TaskGeometry(osmids[0], p1))
task_geometries.append(TaskGeometry(osmids[0], p1))
# set a linestring for every other challenge
if not j % 2:
task.geometries.append(TaskGeometry(osmids[1], l1))
task_geometries.append(TaskGeometry(osmids[1], l1))
# instantiate the task and register it with challenge 'test'
# Initialize a task with its challenge slug and persistent ID
task = Task(challenge.slug, identifier, task_geometries)
# because we are not using the API, we need to call set_location
# explicitly to set the task's location
task.set_location()
Expand Down
2 changes: 1 addition & 1 deletion maproulette/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
from flask import Flask
from simplekv.fs import FilesystemStore
from flaskext.kvsession import KVSessionExtension
from flask_kvsession import KVSessionExtension

# initialize server KV session store
if not os.path.exists('./sessiondata'):
Expand Down
114 changes: 52 additions & 62 deletions maproulette/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from flask.ext.restful import reqparse, fields, marshal, \
marshal_with, Api, Resource
from flask.ext.restful.fields import Raw
from flask.ext.restful.utils import cors
from flask import session, request, abort, url_for
from maproulette.helpers import get_random_task,\
get_challenge_or_404, get_task_or_404,\
Expand Down Expand Up @@ -76,6 +77,7 @@ def format(self, text):
}

api = Api(app)
api.decorators = [cors.crossdomain(origin=app.config['METRICS_URL'])]

# override the default JSON representation to support the geo objects

Expand Down Expand Up @@ -219,6 +221,7 @@ class ApiStats(Resource):
def get(self, challenge_slug=None, user_id=None):
from dateutil import parser as dateparser
from datetime import datetime
from maproulette.models import AggregateMetrics

start = None
end = None
Expand All @@ -233,57 +236,35 @@ def get(self, challenge_slug=None, user_id=None):

breakdown = False

# base CTE and query
# the base CTE gets the set of latest actions for any task
latest_cte = db.session.query(
Action.id,
Action.task_id,
Action.timestamp,
Action.user_id,
Action.status,
Task.challenge_slug,
User.display_name).join(
Task).outerjoin(User).distinct(
Action.task_id).order_by(
Action.task_id,
Action.id.desc()).cte(name='latest')

# the base query gets a count on the base CTE grouped by status,
# optionally broken down by users or challenges
select_fields = [
AggregateMetrics.status,
func.sum(AggregateMetrics.count)]

group_fields = [
AggregateMetrics.status]

if request.path.endswith('/users'):
select_fields.insert(0, AggregateMetrics.user_name)
group_fields.insert(0, AggregateMetrics.user_name)
breakdown = True
stats_query = db.session.query(
latest_cte.c.display_name,
latest_cte.c.status,
func.count(latest_cte.c.id)).group_by(
latest_cte.c.status,
latest_cte.c.display_name).order_by(
latest_cte.c.status)
elif request.path.endswith('/challenges'):
select_fields.insert(0, AggregateMetrics.challenge_slug)
group_fields.insert(0, AggregateMetrics.challenge_slug)
breakdown = True
stats_query = db.session.query(
latest_cte.c.challenge_slug,
latest_cte.c.status,
func.count(latest_cte.c.id)).group_by(
latest_cte.c.status,
latest_cte.c.challenge_slug).order_by(
latest_cte.c.status)
else:
stats_query = db.session.query(
latest_cte.c.status,
func.count(latest_cte.c.id)).group_by(
latest_cte.c.status).order_by(
latest_cte.c.status)

stats_query = db.session.query(
*select_fields).group_by(
*group_fields)

# stats for a specific challenge
if challenge_slug is not None:
stats_query = stats_query.filter(
latest_cte.c.challenge_slug == challenge_slug)
stats_query = stats_query.filter_by(
challenge_slug=challenge_slug)

# stats for a specific user
if user_id is not None:
stats_query = stats_query.filter(
latest_cte.c.user_id == user_id)
stats_query = stats_query.filter_by(
user_id=user_id)

# time slicing filters
if args['start'] is not None:
Expand All @@ -293,7 +274,9 @@ def get(self, challenge_slug=None, user_id=None):
else:
end = dateparser.parse(args['end'])
stats_query = stats_query.filter(
latest_cte.c.timestamp.between(start, end))
AggregateMetrics.timestamp.between(start, end))

app.logger.debug(stats_query)

if breakdown:
# if this is a breakdown by a secondary variable, the
Expand All @@ -310,6 +293,8 @@ class ApiStatsHistory(Resource):

def get(self, challenge_slug=None, user_id=None):

from maproulette.models import HistoricalMetrics as HM

start = None
end = None

Expand All @@ -323,20 +308,19 @@ def get(self, challenge_slug=None, user_id=None):

args = parser.parse_args()

query = db.session.query(
func.date_trunc('day', Action.timestamp).label('day'),
Action.status,
func.count(Action.id)).join(
Task).outerjoin(User)
stats_query = db.session.query(
HM.timestamp,
HM.status,
func.sum(HM.count))

if challenge_slug is not None:
query = query.filter(Task.challenge_slug == challenge_slug)
stats_query = stats_query.filter(HM.challenge_slug == challenge_slug)
if user_id is not None:
query = query.filter(User.id == user_id)
stats_query = stats_query.filter(HM.user_id == user_id)

query = query.group_by(
'day', Action.status).order_by(
Action.status)
stats_query = stats_query.group_by(
HM.timestamp, HM.status).order_by(
HM.status)

# time slicing filters
if args['start'] is not None:
Expand All @@ -345,11 +329,13 @@ def get(self, challenge_slug=None, user_id=None):
end = datetime.utcnow()
else:
end = dateparser.parse(args['end'])
query = query.filter(
stats_query = stats_query.filter(
Action.timestamp.between(start, end))

app.logger.debug(stats_query)

return as_stats_dict(
query.all(),
stats_query.all(),
order=[1, 0, 2],
start=start,
end=end)
Expand Down Expand Up @@ -593,13 +579,13 @@ def get(self, slug):

class AdminApiUpdateTask(Resource):

"""Challenge Task Statuses endpoint"""
"""Challenge Task Create / Update endpoint"""

def put(self, slug, identifier):
"""Create or update one task."""

# Parse the posted data
parse_task_json(json.loads(request.data), slug, identifier)
db.session.add(parse_task_json(slug, json.loads(request.data)))
return {}, 201

def delete(self, slug, identifier):
Expand All @@ -614,17 +600,21 @@ def delete(self, slug, identifier):

class AdminApiUpdateTasks(Resource):

"""Bulk task creation / update endpoint"""
"""Bulk task create / update endpoint"""

def put(self, slug):

app.logger.debug('putting multiple tasks')
app.logger.debug(len(request.data))
# Get the posted data
taskdata = json.loads(request.data)
data = json.loads(request.data)

# debug output number of tasks being posted
app.logger.debug('posting {number} tasks...'.format(number=len(data)))

if len(data) > app.config['MAX_TASKS_BULK_UPDATE']:
abort(400, 'more than 5000 tasks in bulk update')

for task in taskdata:
parse_task_json(task, slug, task['identifier'], commit=False)
for task in data:
db.session.merge(parse_task_json(slug, task))

# commit all dirty tasks at once.
db.session.commit()
Expand Down
68 changes: 25 additions & 43 deletions maproulette/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,6 @@ def task_exists(challenge_slug, task_identifier):
return True


def get_or_create_task(challenge, task_identifier):
"""Return a task, either pull a new one or create a new one"""

task = (Task.identifier == task_identifier). \
filter(Task.challenge_slug == challenge.slug).first()
if not task:
task = Task(challenge.id, task_identifier)
return task


def require_signedin(f):
"""Require the caller to be authenticated against OSM"""

Expand Down Expand Up @@ -145,44 +135,36 @@ def get_random_task(challenge):
return q.first() or None


def parse_task_json(data, slug, identifier, commit=True):
def parse_task_json(slug, data):
"""Parse task json coming in through the admin api"""

# task json needs to have identifier
if not 'identifier' in data:
abort(400, 'no identifier')

# if the task is new, it needs to have geometry
if not 'geometries' in data:
if not task_exists(data['identifier']):
abort(400, 'no geometries for new tasks')

# extract the task geometries
task_geometries = []
geometries = data.pop('geometries')
# parse the geometries
for feature in geometries['features']:
osmid = feature['properties'].get('osmid')
shape = asShape(feature['geometry'])
g = TaskGeometry(osmid, shape)
task_geometries.append(g)

exists = task_exists(slug, identifier)
# create the task
t = Task(slug, data['identifier'], task_geometries)

# abort if the taskdata does not contain geometries and it's a new task
if not 'geometries' in data:
if not exists:
abort(400)
else:
# extract the geometries
geometries = data.pop('geometries')
# parse the geometries
for feature in geometries['features']:
osmid = feature['properties'].get('osmid')
shape = asShape(feature['geometry'])
t = TaskGeometry(osmid, shape)
task_geometries.append(t)

# there's two possible scenarios:
# 1. An existing task gets an update, in that case
# we only need the identifier
# 2. A new task is inserted, in this case we need at
# least an identifier and encoded geometries.

# now we check if the task exists
if exists:
# if it does, update it
task = get_task_or_404(slug, identifier)
if not task.update(data, task_geometries, commit=commit):
abort(400)
else:
# if it does not, create it
new_task = Task(slug, identifier)
new_task.update(data, task_geometries, commit=commit)
return True
# check for instruction
if 'instruction' in data:
t.instruction = data['instruction']

return t


def get_envelope(geoms):
Expand Down
Loading

0 comments on commit 3a520c0

Please sign in to comment.