Skip to content

Commit

Permalink
Merge pull request #52 from acsone/8.0-jobrunner-sbi
Browse files Browse the repository at this point in the history
[8.0] job runner
  • Loading branch information
guewen committed May 15, 2015
2 parents 3055ea0 + 2193b7c commit 499be9c
Show file tree
Hide file tree
Showing 26 changed files with 1,610 additions and 9 deletions.
1 change: 1 addition & 0 deletions connector/AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@
* Leonardo Pistone at Camptocamp
* David Béal at Akretion (tiny change)
* Christophe Combelles at Anybox (french translation)
* Stéphane Bidoul at Acsone (job runner)
2 changes: 2 additions & 0 deletions connector/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@
from . import connector
from . import producer
from . import checkpoint
from . import controllers
from . import jobrunner
3 changes: 3 additions & 0 deletions connector/__openerp__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
'category': 'Generic Modules',
'depends': ['mail'
],
'external_dependencies': {'python': ['requests'
],
},
'data': ['security/connector_security.xml',
'security/ir.model.access.csv',
'queue/model_view.xml',
Expand Down
4 changes: 4 additions & 0 deletions connector/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ def install_in_connector():
install_in_connector()


def is_module_installed(pool, module_name):
return bool(pool.get('%s.installed' % module_name))


def get_openerp_module(cls_or_func):
""" For a top level function or class, returns the
name of the OpenERP module where it lives.
Expand Down
12 changes: 12 additions & 0 deletions connector/connector_menu.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,24 @@
name="Queue"
parent="menu_connector_root"/>

<menuitem id="menu_queue_job_channel"
action="action_queue_job_channel"
sequence="12"
parent="menu_queue"/>

<menuitem id="menu_queue_job_function"
action="action_queue_job_function"
sequence="14"
parent="menu_queue"/>

<menuitem id="menu_queue_worker"
action="action_queue_worker"
sequence="16"
parent="menu_queue"/>

<menuitem id="menu_queue_job"
action="action_queue_job"
sequence="18"
parent="menu_queue"/>

<menuitem id="menu_checkpoint"
Expand Down
1 change: 1 addition & 0 deletions connector/controllers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from . import main
117 changes: 117 additions & 0 deletions connector/controllers/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import logging
import traceback
from cStringIO import StringIO

from psycopg2 import OperationalError

import openerp
from openerp import http
from openerp.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY

from ..session import ConnectorSessionHandler
from ..queue.job import (OpenERPJobStorage,
ENQUEUED)
from ..exception import (NoSuchJobError,
NotReadableJobError,
RetryableJobError,
FailedJobError,
NothingToDoJob)

_logger = logging.getLogger(__name__)

PG_RETRY = 5 # seconds


# TODO: perhaps the notion of ConnectionSession is less important
# now that we are running jobs inside a normal Odoo worker


class RunJobController(http.Controller):

job_storage_class = OpenERPJobStorage

def _load_job(self, session, job_uuid):
""" Reload a job from the backend """
try:
job = self.job_storage_class(session).load(job_uuid)
except NoSuchJobError:
# just skip it
job = None
except NotReadableJobError:
_logger.exception('Could not read job: %s', job_uuid)
raise
return job

@http.route('/connector/runjob', type='http', auth='none')
def runjob(self, db, job_uuid, **kw):

session_hdl = ConnectorSessionHandler(db,
openerp.SUPERUSER_ID)

def retry_postpone(job, message, seconds=None):
with session_hdl.session() as session:
job.postpone(result=message, seconds=seconds)
job.set_pending(self)
self.job_storage_class(session).store(job)

with session_hdl.session() as session:
job = self._load_job(session, job_uuid)
if job is None:
return ""

try:
# if the job has been manually set to DONE or PENDING,
# or if something tries to run a job that is not enqueued
# before its execution, stop
if job.state != ENQUEUED:
_logger.warning('job %s is in state %s '
'instead of enqueued in /runjob',
job.state, job_uuid)
return

with session_hdl.session() as session:
# TODO: set_started should be done atomically with
# update queue_job set=state=started
# where state=enqueid and id=
job.set_started()
self.job_storage_class(session).store(job)

_logger.debug('%s started', job)
with session_hdl.session() as session:
job.perform(session)
job.set_done()
self.job_storage_class(session).store(job)
_logger.debug('%s done', job)

except NothingToDoJob as err:
if unicode(err):
msg = unicode(err)
else:
msg = None
job.cancel(msg)
with session_hdl.session() as session:
self.job_storage_class(session).store(job)

except RetryableJobError as err:
# delay the job later, requeue
retry_postpone(job, unicode(err))
_logger.debug('%s postponed', job)

except OperationalError as err:
# Automatically retry the typical transaction serialization errors
if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY:
raise
retry_postpone(job, unicode(err), seconds=PG_RETRY)
_logger.debug('%s OperationalError, postponed', job)

except (FailedJobError, Exception):
buff = StringIO()
traceback.print_exc(file=buff)
_logger.error(buff.getvalue())

job.set_failed(exc_info=buff.getvalue())
with session_hdl.session() as session:
self.job_storage_class(session).store(job)
raise

return ""
14 changes: 14 additions & 0 deletions connector/doc/api/api_channels.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
########
Channels
########

This is the API documentation for the job channels and the
scheduling mechanisms of the job runner.

These classes are not intended for use by module developers.

.. automodule:: connector.jobrunner.channels
:members:
:undoc-members:
:show-inheritance:

13 changes: 13 additions & 0 deletions connector/doc/api/api_runner.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
##########
Job Runner
##########

This is the API documentation for the job runner.

These classes are not intended for use by module developers.

.. automodule:: connector.jobrunner.runner
:members:
:undoc-members:
:show-inheritance:

33 changes: 33 additions & 0 deletions connector/doc/guides/jobrunner.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
.. _jobrunner:


#######################################
Configuring channels and the job runner
#######################################

.. automodule:: connector.jobrunner.runner

What is a channel?
------------------

.. autoclass:: connector.jobrunner.channels.Channel

How to configure Channels?
--------------------------

The ``ODOO_CONNECTOR_CHANNELS`` environment variable must be
set before starting Odoo in order to enable the job runner
and configure the capacity of the channels.

The general syntax is ``channel(.subchannel)*(:capacity(:key(=value)?)*)?,...``.

Intermediate subchannels which are not configured explicitly are autocreated
with an unlimited capacity (except the root channel which if not configured gets
a default capacity of 1).

Example ``ODOO_CONNECTOR_CHANNELS``:

* ``root:4``: allow up to 4 concurrent jobs in the root channel.
* ``root:4,root.sub:2``: allow up to 4 concurrent jobs in the root channel and
up to 2 concurrent jobs in the channel named ``root.sub``.
* ``sub:2``: the same.
13 changes: 10 additions & 3 deletions connector/doc/guides/multiprocessing.rst
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
.. _multiprocessing:


######################################
Use the connector with multiprocessing
######################################
##############################################
Use the connector with multiprocessing workers
##############################################

.. note:: In a future version, workers will be deprecated
in favor of the newer job runner which is more efficient and
supports job channels. You should try the job runner first
and fall back to using workers in case the runner does not
work (sic) for you, in which case we will very much appreciate
a github issue describing the problems you encountered.

When Odoo is launched with 1 process, the jobs worker will run
threaded in the same process.
Expand Down
6 changes: 5 additions & 1 deletion connector/doc/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ ability to be extended with additional modules for new features or
customizations.

The development of Odoo Connector has been started by `Camptocamp`_ and is now
maintained by `Camptocamp`_, `Akretion`_ and several :ref:`contributors`.
maintained by `Camptocamp`_, `Akretion`_, `Acsone`_ and several :ref:`contributors`.

*Subscribe to the* `project's mailing list`_

Expand All @@ -39,6 +39,7 @@ Core Features
.. _Odoo: http://www.odoo.com
.. _Camptocamp: http://www.camptocamp.com
.. _Akretion: http://www.akretion.com
.. _Acsone: http://www.acsone.eu
.. _`source code is available on GitHub`: https://github.com/OCA/connector
.. _`AGPL version 3`: http://www.gnu.org/licenses/agpl-3.0.html
.. _`project's mailing list`: https://launchpad.net/~openerp-connector-community
Expand Down Expand Up @@ -112,6 +113,7 @@ Developer's guide
guides/code_overview.rst
guides/concepts.rst
guides/bootstrap_connector.rst
guides/jobrunner.rst
guides/multiprocessing.rst

API Reference
Expand All @@ -130,6 +132,8 @@ API Reference
api/api_backend_adapter.rst
api/api_queue.rst
api/api_exception.rst
api/api_channels.rst
api/api_runner.rst

******************
Indices and tables
Expand Down
4 changes: 4 additions & 0 deletions connector/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,7 @@ class IDMissingInBackend(JobError):

class ManyIDSInBackend(JobError):
"""Unique key exists many times in backend"""


class ChannelNotFound(ConnectorException):
""" A channel could not be found """
97 changes: 97 additions & 0 deletions connector/jobrunner/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# -*- coding: utf-8 -*-
##############################################################################
#
# This file is part of connector, an Odoo module.
#
# Author: Stéphane Bidoul <[email protected]>
# Copyright (c) 2015 ACSONE SA/NV (<http://acsone.eu>)
#
# connector is free software: you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation, either version 3 of
# the License, or (at your option) any later version.
#
# connector is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the
# GNU Affero General Public License
# along with connector.
# If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################

import logging
import os
from threading import Thread
import time

from openerp.service import server
from openerp.tools import config

from .runner import ConnectorRunner

_logger = logging.getLogger(__name__)

START_DELAY = 5


# Here we monkey patch the Odoo server to start the job runner thread
# in the main server process (and not in forked workers). This is
# very easy to deploy as we don't need another startup script.
# The drawback is that it is not possible to extend the Odoo
# server command line arguments, so we resort to environment variables
# to configure the runner (channels mostly).


enable = os.environ.get('ODOO_CONNECTOR_CHANNELS')


def run():
# sleep a bit to let the workers start at ease
time.sleep(START_DELAY)
port = os.environ.get('ODOO_CONNECTOR_PORT') or config['xmlrpc_port']
channels = os.environ.get('ODOO_CONNECTOR_CHANNELS')
runner = ConnectorRunner(port or 8069, channels or 'root:1')
runner.run_forever()


orig_prefork_start = server.PreforkServer.start
orig_threaded_start = server.ThreadedServer.start
orig_gevent_start = server.GeventServer.start


def prefork_start(server, *args, **kwargs):
res = orig_prefork_start(server, *args, **kwargs)
if enable and not config['stop_after_init']:
_logger.info("starting jobrunner thread (in prefork server)")
thread = Thread(target=run)
thread.daemon = True
thread.start()
return res


def threaded_start(server, *args, **kwargs):
res = orig_threaded_start(server, *args, **kwargs)
if enable and not config['stop_after_init']:
_logger.info("starting jobrunner thread (in threaded server)")
thread = Thread(target=run)
thread.daemon = True
thread.start()
return res


def gevent_start(server, *args, **kwargs):
res = orig_gevent_start(server, *args, **kwargs)
if enable and not config['stop_after_init']:
_logger.info("starting jobrunner thread (in gevent server)")
# TODO: gevent spawn?
raise RuntimeError("not implemented")
return res


server.PreforkServer.start = prefork_start
server.ThreadedServer.start = threaded_start
server.GeventServer.start = gevent_start
Loading

0 comments on commit 499be9c

Please sign in to comment.