Skip to content

Commit

Permalink
schemas: trigger update on ES and reindexing when schema mapping changed
Browse files Browse the repository at this point in the history
* signals on mappings update
* recreate indices on mappings update (del -> create)
* reindex all records belonging to updated schema
* change ```cap fixtures schemas``` to update schemas in the db (before was
just skipping if schema already existed in the db), triggering ES
changes in reindexing

Signed-off-by: Anna Trzcinska <[email protected]>
  • Loading branch information
annatrz committed Nov 18, 2019
1 parent d2b2a25 commit 74bf3f5
Show file tree
Hide file tree
Showing 11 changed files with 320 additions and 98 deletions.
28 changes: 17 additions & 11 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,37 +23,42 @@
# as an Intergovernmental Organization or submit itself to any jurisdiction.

addons:
postgresql: 9.4
postgresql: 9.4
apt:
sourceline: ppa:simonarons/ppa
update: true
package: sqlite3=3.29.0-2


notifications:
email: false

sudo: false

dist: trusty
dist: precise

language: python

python:
- "2.7"
- "3.5"

services:
- postgresql
- redis
- rabbitmq

env:
- REQUIREMENTS=lowest E2E="no" SQLALCHEMY_DATABASE_URI="postgresql+psycopg2://postgres@localhost:5432/cap" ES_VERSION=5.6.4

before_install:
- "sudo sqlite3 --version"
- "mkdir /tmp/elasticsearch"
- "wget -O - https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-${ES_VERSION}.tar.gz | tar xz --directory=/tmp/elasticsearch --strip-components=1"
- "/tmp/elasticsearch/bin/elasticsearch &"
- "./scripts/ci/prebuild.sh"
- mkdir /tmp/elasticsearch
- wget -O - https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-${ES_VERSION}.tar.gz | tar xz --directory=/tmp/elasticsearch --strip-components=1
- "/tmp/elasticsearch/bin/elasticsearch-plugin install -b ingest-attachment"
- /tmp/elasticsearch/bin/elasticsearch &
- sleep 30
- travis_retry pip install coveralls
- travis_retry pip install kwalitee --pre
- echo ${TRAVIS_COMMIT_RANGE}
- "travis_retry pip install coveralls"
- "travis_retry pip install kwalitee --pre"
- "echo ${TRAVIS_COMMIT_RANGE}"
- "travis_retry pip install --upgrade pip setuptools py"
- "travis_retry pip install twine wheel coveralls requirements-builder"
- "cat requirements.txt > .travis-lowest-requirements.txt"
Expand All @@ -64,6 +69,7 @@ install:
- "travis_retry pip install -e .[docs,tests,ldap]"

before_script:
- sleep 10
- travis_retry pip install kwalitee GitPython --pre

script:
Expand Down
7 changes: 5 additions & 2 deletions cap/modules/deposit/ext.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
"""Initialize extension."""

from __future__ import absolute_import, print_function
from cap.modules.schemas.models import Schema

from invenio_search import current_search

from cap.modules.schemas.models import Schema

from .receivers import handle_deposit_mapping_updated


class CAPDeposit(object):
"""CAPDeposit extension."""

def __init__(self, app=None):
"""Extension initialization."""
if app:
Expand Down
35 changes: 35 additions & 0 deletions cap/modules/deposit/receivers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# -*- coding: utf-8 -*-
#
# This file is part of CERN Analysis Preservation Framework.
# Copyright (C) 2016 CERN.
#
# CERN Analysis Preservation Framework is free software; you can redistribute
# it and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# CERN Analysis Preservation Framework is distributed in the hope that it will
# be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with CERN Analysis Preservation Framework; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307, USA.
#
# In applying this license, CERN does not
# waive the privileges and immunities granted to it by virtue of its status
# as an Intergovernmental Organization or submit itself to any jurisdiction.
"""Registered signal handlers for records module."""
from invenio_jsonschemas.proxies import current_jsonschemas

from cap.modules.records.utils import reindex_by_schema_url
from cap.modules.schemas.signals import deposit_mapping_updated


@deposit_mapping_updated.connect
def handle_deposit_mapping_updated(schema):
"""Reindex all the deposits when mapping in ES got updated."""
schema_url = current_jsonschemas.path_to_url(schema.deposit_path)
reindex_by_schema_url(schema_url, 'depid')
3 changes: 2 additions & 1 deletion cap/modules/records/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@
# In applying this license, CERN does not
# waive the privileges and immunities granted to it by virtue of its status
# as an Intergovernmental Organization or submit itself to any jurisdiction.

"""Data model package."""

from .receivers import handle_record_mapping_updated
37 changes: 37 additions & 0 deletions cap/modules/records/receivers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# -*- coding: utf-8 -*-
#
# This file is part of CERN Analysis Preservation Framework.
# Copyright (C) 2016 CERN.
#
# CERN Analysis Preservation Framework is free software; you can redistribute
# it and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# CERN Analysis Preservation Framework is distributed in the hope that it will
# be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with CERN Analysis Preservation Framework; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307, USA.
#
# In applying this license, CERN does not
# waive the privileges and immunities granted to it by virtue of its status
# as an Intergovernmental Organization or submit itself to any jurisdiction.
"""Registered signal handlers for deposit module."""
from invenio_jsonschemas.proxies import current_jsonschemas

from cap.modules.schemas.signals import record_mapping_updated

from .utils import reindex_by_schema_url


@record_mapping_updated.connect
def handle_record_mapping_updated(schema):
"""Reindex all the record when mapping in ES got updated."""
schema_url = current_jsonschemas.path_to_url(schema.record_path)

reindex_by_schema_url(schema_url, 'recid')
14 changes: 10 additions & 4 deletions cap/modules/records/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,15 @@
import string

from flask import url_for
from invenio_db import db
from invenio_indexer.api import RecordIndexer
from invenio_pidstore.errors import PIDDoesNotExistError
from invenio_pidstore.models import PersistentIdentifier
from invenio_pidstore.models import PersistentIdentifier, PIDStatus
from invenio_records.models import RecordMetadata
from six.moves.urllib import parse
from sqlalchemy import cast
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.dialects.sqlite import JSON


def generate_recid(experiment):
Expand Down Expand Up @@ -86,9 +92,9 @@ def _get_json_type():
schema_url, _get_json_type())).values(RecordMetadata.id))

filtered_by_pid_type = (x[0] for x in PersistentIdentifier.query.filter(
PersistentIdentifier.status == PIDStatus.REGISTERED,
PersistentIdentifier.object_type == 'rec', PersistentIdentifier.
pid_type == pid_type, PersistentIdentifier.object_uuid.in_(
PersistentIdentifier.object_type == 'rec',
PersistentIdentifier.pid_type == pid_type, PersistentIdentifier.status
== PIDStatus.REGISTERED, PersistentIdentifier.object_uuid.in_(
ids)).values(PersistentIdentifier.object_uuid))

print('{} records will be reindexed...'.format(schema_url))
Expand Down
13 changes: 7 additions & 6 deletions cap/modules/schemas/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,19 @@ def add_schema_from_fixture(data=None):
with db.session.begin_nested():
with db.session.begin_nested():
try:
schema = Schema.get(name=data['name'],
version=data['version'])
click.secho('{} already exist in the db.'.format(
str(name)))
return
schema = Schema.get(name=name, version=data['version'])
schema.update(**data)
msg, fg = '{} updated.'.format(str(name)), 'green'

except JSONSchemaNotFound:
schema = Schema(**data)
db.session.add(schema)
msg, fg = '{} added.'.format(str(name)), 'green'

if allow_all:
schema.add_read_access_for_all_users()
else:
schema.revoke_access_for_all_users()

except IntegrityError:
click.secho('Error occured during adding {} to the db. \n'.format(
Expand All @@ -89,4 +90,4 @@ def add_schema_from_fixture(data=None):
return

db.session.commit()
click.secho('{} has been added.'.format(str(name)), fg='green')
click.secho(msg, fg=fg)
126 changes: 96 additions & 30 deletions cap/modules/schemas/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@
from six.moves.urllib.parse import urljoin
from sqlalchemy import UniqueConstraint, event
from sqlalchemy.orm import validates
from sqlalchemy.orm.base import NO_VALUE
from sqlalchemy.orm.exc import NoResultFound
from werkzeug.utils import import_string

from cap.types import json_type

from .permissions import SchemaAdminAction, SchemaReadAction
from .serializers import resolved_schemas_serializer, schema_serializer
from .signals import deposit_mapping_updated, record_mapping_updated

ES_FORBIDDEN = r' ,"\<*>|?'

Expand Down Expand Up @@ -208,11 +210,30 @@ def add_read_access_for_all_users(self):
"""Give read access to all authenticated users."""
assert self.id

db.session.add(
ActionSystemRoles.allow(SchemaReadAction(self.id),
role=authenticated_user))
try:
ActionSystemRoles.query.filter(
ActionSystemRoles.action == 'schema-object-read',
ActionSystemRoles.argument == str(self.id),
ActionSystemRoles.role_name == 'authenticated_user').one()
except NoResultFound:
db.session.add(
ActionSystemRoles.allow(SchemaReadAction(self.id),
role=authenticated_user))
db.session.flush()

def revoke_access_for_all_users(self):
"""Revoke read access to all authenticated users."""
assert self.id

try:
db.session.delete(
ActionSystemRoles.query.filter(
ActionSystemRoles.action == 'schema-object-read',
ActionSystemRoles.argument == str(self.id),
ActionSystemRoles.role_name == 'authenticated_user').one())
except NoResultFound:
pass

def give_admin_access_for_user(self, user):
"""Give admin access for users."""
assert self.id
Expand Down Expand Up @@ -270,39 +291,55 @@ def name_to_es_name(name):
return name.replace('/', '-')


def create_index(index_name, mapping_body, aliases):
"""Create index in elasticsearch, add under given aliases."""
if not es.indices.exists(index_name):
current_search.mappings[index_name] = {} # invenio search needs it

es.indices.create(index=index_name,
body={'mappings': mapping_body},
ignore=False)

for alias in aliases:
es.indices.update_aliases(
{'actions': [{
'add': {
'index': index_name,
'alias': alias
}
}]})


@event.listens_for(Schema, 'after_insert')
def after_insert_schema(target, value, schema):
"""On schema insert, create corresponding indexes and aliases in ES."""
if schema.is_indexed:
create_index(schema.deposit_index, schema.deposit_mapping,
schema.deposit_aliases)
create_index(schema.record_index, schema.record_mapping,
schema.record_aliases)
_recreate_deposit_mapping_in_ES(schema, schema.deposit_mapping)
_recreate_record_mapping_in_ES(schema, schema.record_mapping)

# invenio search needs it
mappings_imp = current_app.config.get('SEARCH_GET_MAPPINGS_IMP')
current_cache.delete_memoized(import_string(mappings_imp))


@event.listens_for(Schema.deposit_mapping, 'set')
def after_deposit_mapping_updated(target, value, oldvalue, initiator):
"""If deposit mapping field was updated:
* trigger mapping update in ES
* send signal
Skip if:
* triggered on creation of schema (not update)
* schema not indexed in ES
"""
if oldvalue == NO_VALUE or not target.is_indexed:
return

_recreate_deposit_mapping_in_ES(target, value)

if target.use_deposit_as_record:
_recreate_record_mapping_in_ES(target, value)


@event.listens_for(Schema.record_mapping, 'set')
def after_record_mapping_updated(target, value, oldvalue, initiator):
"""If record mapping field was updated:
* trigger mapping update in ES
* send signal
Skip if:
* triggered on creation of schema (not update)
* schema not indexed in ES
* flag use_deposit_as_record, so record mapping changes can be ignored
"""
if oldvalue == NO_VALUE or not target.is_indexed or \
target.use_deposit_as_record:
return

_recreate_record_mapping_in_ES(target, value)


@event.listens_for(Schema, 'after_delete')
def before_delete_schema(mapper, connect, schema):
"""On schema delete, delete corresponding indexes and aliases in ES."""
Expand All @@ -316,7 +353,36 @@ def before_delete_schema(mapper, connect, schema):
current_cache.delete_memoized(import_string(mappings_imp))


@db.event.listens_for(Schema, 'before_update', propagate=True)
def timestamp_before_update(mapper, connection, target):
"""Update `updated` property with current time on `before_update` event."""
target.updated = datetime.utcnow()
def _create_index(index_name, mapping_body, aliases):
"""Create index in elasticsearch, add under given aliases."""
if not es.indices.exists(index_name):
current_search.mappings[index_name] = {} # invenio search needs it

es.indices.create(index=index_name,
body={'mappings': mapping_body},
ignore=False)

for alias in aliases:
es.indices.update_aliases(
{'actions': [{
'add': {
'index': index_name,
'alias': alias
}
}]})


def _recreate_deposit_mapping_in_ES(schema, mapping):
if es.indices.exists(schema.deposit_index):
es.indices.delete(index=schema.deposit_index)

_create_index(schema.deposit_index, mapping, schema.deposit_aliases)
deposit_mapping_updated.send(schema)


def _recreate_record_mapping_in_ES(schema, mapping):
if es.indices.exists(schema.record_index):
es.indices.delete(index=schema.record_index)

_create_index(schema.record_index, mapping, schema.record_aliases)
record_mapping_updated.send(schema)
Loading

0 comments on commit 74bf3f5

Please sign in to comment.