From 0d96661f27c6a40739c03c88fa7bd3c1d1c95f9d Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Tue, 25 Jun 2013 11:45:53 +0200 Subject: [PATCH 1/9] [REF] reworked the workers: * do no longer use the registries, they are not reliable, check if the connector is installed on the database names * call the signaling methods of the registry within a session context, ensure that we use the correct registry and it is cleared if necessary * do not store the worker on the 'queue.worker' model, it is cleared when the registry is reinitialized This is much reliable because the worker stay alive even if a registry is rebuilt (the worker was replaced by a new one before but anyway it was not linked to the registry) --- connector/queue/model.py | 44 +++++++++++---------- connector/queue/worker.py | 83 ++++++++++++++++++++++++--------------- connector/session.py | 11 +++++- 3 files changed, 85 insertions(+), 53 deletions(-) diff --git a/connector/queue/model.py b/connector/queue/model.py index 1e825f05b..acc983251 100644 --- a/connector/queue/model.py +++ b/connector/queue/model.py @@ -30,6 +30,7 @@ from .job import STATES, DONE, PENDING, OpenERPJobStorage from .worker import WORKER_TIMEOUT from ..session import ConnectorSession +from .worker import watcher _logger = logging.getLogger(__name__) @@ -180,7 +181,6 @@ class QueueWorker(orm.Model): _rec_name = 'uuid' worker_timeout = WORKER_TIMEOUT - _worker = None _columns = { 'uuid': fields.char('UUID', readonly=True, select=True, required=True), @@ -189,7 +189,7 @@ class QueueWorker(orm.Model): 'date_alive': fields.datetime('Last Alive Check', readonly=True), 'job_ids': fields.one2many('queue.job', 'worker_id', string='Jobs', readonly=True), - } + } def _notify_alive(self, cr, uid, worker, context=None): worker_ids = self.search(cr, uid, @@ -204,12 +204,12 @@ def _notify_alive(self, cr, uid, worker, context=None): 'date_start': now_fmt, 'date_alive': now_fmt}, context=context) - self._worker = worker else: self.write(cr, uid, worker_ids, {'date_alive': now_fmt}, context=context) def _purge_dead_workers(self, cr, uid, context=None): + mem_worker = watcher.worker_for_db(cr.dbname) deadline = datetime.now() - timedelta(seconds=self.worker_timeout) deadline_fmt = deadline.strftime(DEFAULT_SERVER_DATETIME_FORMAT) dead_ids = self.search(cr, uid, @@ -218,23 +218,23 @@ def _purge_dead_workers(self, cr, uid, context=None): dead_workers = self.read(cr, uid, dead_ids, ['uuid'], context=context) for worker in dead_workers: _logger.debug('Worker %s is dead', worker['uuid']) - # exists in self._workers only for the same process and pool - if worker['uuid'] == self._worker: - _logger.error('Worker %s should be alive, ' - 'but appears to be dead.', + # exists in the WorkerWatcher but is dead according to db + if worker['uuid'] == mem_worker.uuid: + _logger.error('Worker %s seems alive, ' + 'but appears to be dead in database.', worker['uuid']) - self._worker = None try: self.unlink(cr, uid, dead_ids, context=context) except Exception: _logger.debug("Failed attempt to unlink a dead worker, likely due " "to another transaction in progress. " "Trace of the failed unlink " - "%s attempt: ", self._worker.uuid, exc_info=True) + "%s attempt: ", mem_worker.uuid, exc_info=True) def _worker_id(self, cr, uid, context=None): - assert self._worker - worker_ids = self.search(cr, uid, [('uuid', '=', self._worker.uuid)], + worker = watcher.worker_for_db(cr.dbname) + assert worker + worker_ids = self.search(cr, uid, [('uuid', '=', worker.uuid)], context=context) assert len(worker_ids) == 1, ("%s worker found in database instead " "of 1" % len(worker_ids)) @@ -268,7 +268,8 @@ def assign_jobs(self, cr, uid, max_jobs=None, context=None): :param max_jobs: maximal limit of jobs to assign on a worker :type max_jobs: int """ - if self._worker: + worker = watcher.worker_for_db(cr.dbname) + if worker: self._assign_jobs(cr, uid, max_jobs=max_jobs, context=context) else: _logger.debug('No worker started for process %s', os.getpid()) @@ -278,7 +279,8 @@ def enqueue_jobs(self, cr, uid, context=None): """ Enqueue all the jobs assigned to the worker of the current process """ - if self._worker: + worker = watcher.worker_for_db(cr.dbname) + if worker: self._enqueue_jobs(cr, uid, context=context) else: _logger.debug('No worker started for process %s', os.getpid()) @@ -295,6 +297,7 @@ def _assign_jobs(self, cr, uid, max_jobs=None, context=None): # use a SAVEPOINT to be able to rollback this part of the # transaction without failing the whole transaction if the LOCK # cannot be acquired + worker = watcher.worker_for_db(cr.dbname) cr.execute("SAVEPOINT queue_assign_jobs") try: cr.execute(sql, log_exceptions=False) @@ -306,23 +309,23 @@ def _assign_jobs(self, cr, uid, max_jobs=None, context=None): _logger.debug("Failed attempt to assign jobs, likely due to " "another transaction in progress. " "Trace of the failed assignment of jobs on worker " - "%s attempt: ", self._worker.uuid, exc_info=True) + "%s attempt: ", worker.uuid, exc_info=True) return job_rows = cr.fetchall() if not job_rows: - _logger.debug('No job to assign to worker %s', self._worker.uuid) + _logger.debug('No job to assign to worker %s', worker.uuid) return job_ids = [id for id, in job_rows] worker_id = self._worker_id(cr, uid, context=context) _logger.debug('Assign %d jobs to worker %s', len(job_ids), - self._worker.uuid) + worker.uuid) # ready to be enqueued in the worker try: self.pool.get('queue.job').write(cr, uid, job_ids, - {'state': 'pending', - 'worker_id': worker_id}, - context=context) + {'state': 'pending', + 'worker_id': worker_id}, + context=context) except Exception: pass # will be assigned to another worker @@ -331,9 +334,10 @@ def _enqueue_jobs(self, cr, uid, context=None): already queued""" db_worker_id = self._worker_id(cr, uid, context=context) db_worker = self.browse(cr, uid, db_worker_id, context=context) + worker = watcher.worker_for_db(cr.dbname) for job in db_worker.job_ids: if job.state == 'pending': - self._worker.enqueue_job_uuid(job.uuid) + worker.enqueue_job_uuid(job.uuid) class requeue_job(orm.TransientModel): diff --git a/connector/queue/worker.py b/connector/queue/worker.py index c5424332c..784ed9bb1 100644 --- a/connector/queue/worker.py +++ b/connector/queue/worker.py @@ -31,8 +31,8 @@ from psycopg2 import OperationalError import openerp -import openerp.modules.registry as registry_module from openerp.osv.osv import PG_CONCURRENCY_ERRORS_TO_RETRY +from openerp.tools import config from .queue import JobsQueue from ..session import ConnectorSessionHandler from .job import (OpenERPJobStorage, @@ -62,6 +62,7 @@ def __init__(self, db_name, watcher): super(Worker, self).__init__() self.queue = self.queue_class() self.db_name = db_name + threading.current_thread().dbname = db_name self.uuid = unicode(uuid.uuid4()) self.watcher = watcher @@ -177,7 +178,8 @@ def run(self): Wait for jobs and execute them sequentially. """ while 1: - # check if the worker has to exit (registry destroyed) + # check if the worker has to exit (db destroyed, connector + # uninstalled) if self.watcher.worker_lost(self): break job = self.queue.dequeue() @@ -217,7 +219,6 @@ class WorkerWatcher(threading.Thread): def __init__(self): super(WorkerWatcher, self).__init__() - self._workers_lock = threading.Lock() self._workers = {} def _new(self, db_name): @@ -230,11 +231,24 @@ def _new(self, db_name): worker.daemon = True worker.start() - def delete(self, db_name): - """ Delete worker for the database """ + def _delete(self, db_name): + """ Delete a worker associated with a database """ if db_name in self._workers: - with self._workers_lock: - del self._workers[db_name] + worker_uuid = self._workers[db_name].uuid + # the worker will exit (it checks ``worker_lost()``) + del self._workers[db_name] + session_hdl = ConnectorSessionHandler(db_name, + openerp.SUPERUSER_ID) + with session_hdl.session() as session: + worker_ids = session.search('queue.worker', + [('uuid', '=', worker_uuid)]) + try: + session.unlink('queue.worker', worker_ids) + except Exception: + pass # if it fails, it will be removed after 5 minutes + + def worker_for_db(self, db_name): + return self._workers.get(db_name) def worker_lost(self, worker): """ Indicate if a worker is no longer referenced by the watcher. @@ -244,21 +258,32 @@ def worker_lost(self, worker): return worker not in self._workers.itervalues() @staticmethod - def available_registries(): - """ Yield the registries which are available. + def available_db_names(): + """ Returns the databases for the server having + the connector module installed. Available means that they can be used by a `Worker`. - :return: database name, registry - :rtype: tuple + :return: database names + :rtype: list """ - registries = registry_module.RegistryManager.registries - for db_name, registry in registries.iteritems(): - if not 'connector.installed' in registry.models: - continue - if not registry.ready: - continue - yield db_name, registry + if config['db_name']: + db_names = config['db_name'].split(',') + else: + service = openerp.netsvc.ExportService._services['db'] + db_names = service.exp_list(True) + available_db_names = [] + for db_name in db_names: + session_hdl = ConnectorSessionHandler(db_name, + openerp.SUPERUSER_ID) + with session_hdl.session() as session: + cr = session.cr + cr.execute("SELECT 1 FROM ir_module_module " + "WHERE name = %s " + "AND state = %s", ('connector', 'installed')) + if cr.fetchone(): + available_db_names.append(db_name) + return available_db_names def _update_workers(self): """ Refresh the list of workers according to the available @@ -268,17 +293,21 @@ def _update_workers(self): `Worker` or a database could have been dropped, so we have to discard the Worker. """ - for db_name, _registry in self.available_registries(): + db_names = self.available_db_names() + # deleted db or connector uninstalled: remove the workers + for db_name in set(self._workers) - set(db_names): + self._delete(db_name) + + for db_name in db_names: if db_name not in self._workers: self._new(db_name) def run(self): """ `WorkerWatcher`'s main loop """ while 1: - with self._workers_lock: - self._update_workers() - for db_name, worker in self._workers.items(): - self.check_alive(db_name, worker) + self._update_workers() + for db_name, worker in self._workers.items(): + self.check_alive(db_name, worker) time.sleep(WAIT_CHECK_WORKER_ALIVE) def check_alive(self, db_name, worker): @@ -315,14 +344,6 @@ def _purge_dead_workers(self, session): watcher = WorkerWatcher() -registry_delete_original = registry_module.RegistryManager.delete -def delete(cls, db_name): - """Delete the registry linked to a given database. """ - watcher.delete(db_name) - return registry_delete_original(db_name) -registry_module.RegistryManager.delete = classmethod(delete) - - def start_service(): """ Start the watcher """ watcher.daemon = True diff --git a/connector/session.py b/connector/session.py index a1abed0d3..f73b3ecc3 100644 --- a/connector/session.py +++ b/connector/session.py @@ -19,10 +19,11 @@ # ############################################################################## -import openerp - from contextlib import contextmanager +import openerp +from openerp.modules.registry import RegistryManager + class ConnectorSessionHandler(object): """ Allow to create a new `ConnectorSession` for a database. @@ -58,6 +59,7 @@ def session(self): * rollbacked on errors * commited at the end of the ``with`` context when no error occured * always closed at the end of the ``with`` context + * it handles the registry signaling """ db = openerp.sql_db.db_connect(self.db_name) session = ConnectorSession(db.cursor(), @@ -65,7 +67,9 @@ def session(self): context=self.context) try: + RegistryManager.check_registry_signaling(self.db_name) yield session + RegistryManager.signal_caches_change(self.db_name) except: session.rollback() raise @@ -173,6 +177,9 @@ def create(self, model, values): def write(self, model, ids, values): return self.pool[model].write(self.cr, self.uid, ids, values, context=self.context) + def unlink(self, model, ids): + return self.pool[model].unlink(self.cr, self.uid, ids, context=self.context) + def __repr__(self): return '' % (self.cr.dbname, self.uid) From 6ebcbe9b782b559eb329fc8e835ddc4d3cb35b81 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Tue, 25 Jun 2013 11:48:35 +0200 Subject: [PATCH 2/9] [FIX] unlink was useless here because _delete() is called when: the db is dropped, the connector module is uninstalled -> anyway we do not bother anymore of the queue_worker DB table --- connector/queue/worker.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/connector/queue/worker.py b/connector/queue/worker.py index 784ed9bb1..086bc5090 100644 --- a/connector/queue/worker.py +++ b/connector/queue/worker.py @@ -237,15 +237,6 @@ def _delete(self, db_name): worker_uuid = self._workers[db_name].uuid # the worker will exit (it checks ``worker_lost()``) del self._workers[db_name] - session_hdl = ConnectorSessionHandler(db_name, - openerp.SUPERUSER_ID) - with session_hdl.session() as session: - worker_ids = session.search('queue.worker', - [('uuid', '=', worker_uuid)]) - try: - session.unlink('queue.worker', worker_ids) - except Exception: - pass # if it fails, it will be removed after 5 minutes def worker_for_db(self, db_name): return self._workers.get(db_name) From 215427d570ea143037050a942fe1d01c3a7556c0 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Tue, 25 Jun 2013 16:43:49 +0200 Subject: [PATCH 3/9] [CHG] multiprocessing: The Jobs workers are affected by the database being closed when they are ran in the Cron Worker process. Thus, when OpenERP run in multiprocess, the connector do not start job workers threads. Instead, the new script ``openerp-connector-worker`` should be used. It spawns processes which start the the job workers threads themselves. This is not ideal as soon as we have to ensure that OpenERP AND the script are running. However: it still works normally when OpenERP is not using multiprocessing and this change allow more control on the worker processes (to implement PG's NOTIFY for instance). --- connector/doc/guides/multiprocessing.rst | 32 +++++++ connector/doc/index.rst | 1 + connector/openerp-connector-worker | 102 +++++++++++++++++++++++ connector/queue/worker.py | 16 +++- 4 files changed, 148 insertions(+), 3 deletions(-) create mode 100644 connector/doc/guides/multiprocessing.rst create mode 100755 connector/openerp-connector-worker diff --git a/connector/doc/guides/multiprocessing.rst b/connector/doc/guides/multiprocessing.rst new file mode 100644 index 000000000..5169e68d4 --- /dev/null +++ b/connector/doc/guides/multiprocessing.rst @@ -0,0 +1,32 @@ +.. _multiprocessing: + + +###################################### +Use the connector with multiprocessing +###################################### + +When OpenERP is launched with 1 process, the jobs worker will run +threaded in the same process. + +When OpenERP is launched with multiple processes using the option +``--workers``, the jobs workers are not independant processes, however, +you have to launch them separately with the script +``openerp-connector-worker`` located in the connector module. + +It takes the same arguments and configuration file than the OpenERP +server. + +.. important:: The Python path must contain the path to the OpenERP + server when ``openerp-connector-worker`` is launched. + +Example:: + + $ PYTHONPATH=/path/to/server connector/openerp-connector-worker --config /path/to/configfile \ + --workers=2 --log-file=/path/to/logfile + +The 'Enqueue Jobs' scheduled action is useless when multiprocessing is +used. + +.. note:: The ``openerp-connector-worker`` should not be launched + alongside OpenERP when the latter does not run in multiprocess + mode, because the interprocess signaling would not be done. diff --git a/connector/doc/index.rst b/connector/doc/index.rst index 0e76364ec..9da66b0b5 100644 --- a/connector/doc/index.rst +++ b/connector/doc/index.rst @@ -31,6 +31,7 @@ Developer's guide guides/overview.rst guides/bootstrap_connector.rst + guides/multiprocessing.rst API Reference ============= diff --git a/connector/openerp-connector-worker b/connector/openerp-connector-worker new file mode 100755 index 000000000..71063a432 --- /dev/null +++ b/connector/openerp-connector-worker @@ -0,0 +1,102 @@ +#!/usr/bin/env python +import sys +import logging +import os +import signal +import time +import threading +from contextlib import closing + +import openerp +from openerp.cli import server as servercli +import openerp.service.workers as workers +from openerp.modules.registry import RegistryManager +from openerp.tools import config + +_logger = logging.getLogger('connector.worker.multi') + + +class Multicornnector(workers.Multicorn): + + def __init__(self, app): + super(Multicornnector, self).__init__(app) + self.address = ('0.0.0.0', 0) + self.population = config['workers'] + self.workers_connector = {} + + def process_spawn(self): + while len(self.workers_connector) < self.population: + self.worker_spawn(WorkerConnector, self.workers_connector) + + def worker_pop(self, pid): + if pid in self.workers: + _logger.debug("Worker (%s) unregistered", pid) + try: + self.workers_connector.pop(pid, None) + u = self.workers.pop(pid) + u.close() + except OSError: + return + + +class WorkerConnector(workers.Worker): + """ HTTP Request workers """ + + def __init__(self, multi): + super(WorkerConnector, self).__init__(multi) + self.db_index = 0 + + def process_work(self): + if config['db_name']: + db_names = config['db_name'].split(',') + else: + services = openerp.netsvc.ExportService._services + if services.get('db'): + db_names = services['db'].exp_list(True) + else: + db_names = [] + if len(db_names): + self.db_index = (self.db_index + 1) % len(db_names) + db_name = db_names[self.db_index] + self.setproctitle(db_name) + db = openerp.sql_db.db_connect(db_name) + threading.current_thread().dbname = db_name + with closing(db.cursor()) as cr: + cr.execute("SELECT 1 FROM ir_module_module " + "WHERE name = %s " + "AND state = %s", ('connector', 'installed')) + if cr.fetchone(): + RegistryManager.check_registry_signaling(db_name) + registry = openerp.pooler.get_pool(db_name) + if registry: + queue_worker = registry['queue.worker'] + queue_worker.assign_then_enqueue(cr, + openerp.SUPERUSER_ID, + max_jobs=50) + RegistryManager.signal_caches_change(db_name) + else: + self.db_index = 0 + + def sleep(self): + # Really sleep once all the databases have been processed. + if self.db_index == 0: + interval = 15 + self.pid % self.multi.population # chorus effect + time.sleep(interval) + + def start(self): + workers.Worker.start(self) + openerp.service.start_internal() + + +if __name__ == "__main__": + args = sys.argv[1:] + servercli.check_root_user() + config.parse_config(args) + + servercli.check_postgres_user() + openerp.netsvc.init_logger() + servercli.report_configuration() + + openerp.multi_process = True + openerp.worker_connector = True + Multicornnector(openerp.service.wsgi_server.application).run() diff --git a/connector/queue/worker.py b/connector/queue/worker.py index 086bc5090..f34916419 100644 --- a/connector/queue/worker.py +++ b/connector/queue/worker.py @@ -261,8 +261,11 @@ def available_db_names(): if config['db_name']: db_names = config['db_name'].split(',') else: - service = openerp.netsvc.ExportService._services['db'] - db_names = service.exp_list(True) + services = openerp.netsvc.ExportService._services + if services.get('db'): + db_names = services['db'].exp_list(True) + else: + db_names = [] available_db_names = [] for db_name in db_names: session_hdl = ConnectorSessionHandler(db_name, @@ -340,4 +343,11 @@ def start_service(): watcher.daemon = True watcher.start() -start_service() +# We have to launch the Jobs Workers only if: +# 1. OpenERP is used in standalone mode (monoprocess) +# 2. Or it is used in multiprocess (with option ``--workers``) +# but the current process is a Connector Worker +# (launched with the ``openerp-connector-worker`` script). +if (not getattr(openerp, 'multi_process', False) or + getattr(openerp, 'worker_connector', False)): + start_service() From ea15a51889e3b801a7c9f969a828bf95a9830409 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Wed, 26 Jun 2013 07:24:51 +0200 Subject: [PATCH 4/9] [FIX] typo: name of the option is logfile not log-file --- connector/doc/guides/multiprocessing.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/doc/guides/multiprocessing.rst b/connector/doc/guides/multiprocessing.rst index 5169e68d4..741c0af81 100644 --- a/connector/doc/guides/multiprocessing.rst +++ b/connector/doc/guides/multiprocessing.rst @@ -22,7 +22,7 @@ server. Example:: $ PYTHONPATH=/path/to/server connector/openerp-connector-worker --config /path/to/configfile \ - --workers=2 --log-file=/path/to/logfile + --workers=2 --logfile=/path/to/logfile The 'Enqueue Jobs' scheduled action is useless when multiprocessing is used. From 37611eecccabd17d8ed7cab0dcf86a44e8b1f7fd Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Wed, 26 Jun 2013 07:33:04 +0200 Subject: [PATCH 5/9] [CHG] name of the logger --- connector/openerp-connector-worker | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/openerp-connector-worker b/connector/openerp-connector-worker index 71063a432..d57e9546c 100755 --- a/connector/openerp-connector-worker +++ b/connector/openerp-connector-worker @@ -13,7 +13,7 @@ import openerp.service.workers as workers from openerp.modules.registry import RegistryManager from openerp.tools import config -_logger = logging.getLogger('connector.worker.multi') +_logger = logging.getLogger('openerp-connector-worker') class Multicornnector(workers.Multicorn): From 8d01bb816354bb4dcaf85a89a763390908ec7613 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Wed, 26 Jun 2013 07:39:33 +0200 Subject: [PATCH 6/9] [FIX] purge of workers: no sense to check if it still exist in the current process anyway even if it still exists, it can live in another process. Anyway if it still exists, it will be created again in the DB --- connector/queue/model.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/connector/queue/model.py b/connector/queue/model.py index acc983251..18da448e4 100644 --- a/connector/queue/model.py +++ b/connector/queue/model.py @@ -209,7 +209,6 @@ def _notify_alive(self, cr, uid, worker, context=None): {'date_alive': now_fmt}, context=context) def _purge_dead_workers(self, cr, uid, context=None): - mem_worker = watcher.worker_for_db(cr.dbname) deadline = datetime.now() - timedelta(seconds=self.worker_timeout) deadline_fmt = deadline.strftime(DEFAULT_SERVER_DATETIME_FORMAT) dead_ids = self.search(cr, uid, @@ -218,18 +217,11 @@ def _purge_dead_workers(self, cr, uid, context=None): dead_workers = self.read(cr, uid, dead_ids, ['uuid'], context=context) for worker in dead_workers: _logger.debug('Worker %s is dead', worker['uuid']) - # exists in the WorkerWatcher but is dead according to db - if worker['uuid'] == mem_worker.uuid: - _logger.error('Worker %s seems alive, ' - 'but appears to be dead in database.', - worker['uuid']) try: self.unlink(cr, uid, dead_ids, context=context) except Exception: _logger.debug("Failed attempt to unlink a dead worker, likely due " - "to another transaction in progress. " - "Trace of the failed unlink " - "%s attempt: ", mem_worker.uuid, exc_info=True) + "to another transaction in progress.") def _worker_id(self, cr, uid, context=None): worker = watcher.worker_for_db(cr.dbname) From b14bced53c07c001b1f54f4bf6a46d46efabba3d Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Wed, 26 Jun 2013 07:47:44 +0200 Subject: [PATCH 7/9] [CHG] optimize queuing of jobs: read all the pending jobs at once instead of using browse on the worker --- connector/queue/model.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/connector/queue/model.py b/connector/queue/model.py index 18da448e4..7696e5d83 100644 --- a/connector/queue/model.py +++ b/connector/queue/model.py @@ -322,14 +322,18 @@ def _assign_jobs(self, cr, uid, max_jobs=None, context=None): pass # will be assigned to another worker def _enqueue_jobs(self, cr, uid, context=None): - """ Called by an ir.cron, add to the queue all the jobs not - already queued""" + """ Add to the queue of the worker all the jobs not + yet queued but already assigned.""" + job_obj = self.pool.get('queue.job') db_worker_id = self._worker_id(cr, uid, context=context) - db_worker = self.browse(cr, uid, db_worker_id, context=context) + job_ids = job_obj.search(cr, uid, + [('worker_id', '=', db_worker_id), + ('state', '=', 'pending')], + context=context) worker = watcher.worker_for_db(cr.dbname) - for job in db_worker.job_ids: - if job.state == 'pending': - worker.enqueue_job_uuid(job.uuid) + jobs = job_obj.read(cr, uid, job_ids, ['uuid'], context=context) + for job in jobs: + worker.enqueue_job_uuid(job['uuid']) class requeue_job(orm.TransientModel): From f67f20495aa13d8cb81ad3beccadd7acf63d6f1d Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Fri, 28 Jun 2013 15:56:07 +0200 Subject: [PATCH 8/9] [FIX] skip _notify_alive and _purge_dead_worker when the models are not yet in the registry/pooler --- connector/queue/worker.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/connector/queue/worker.py b/connector/queue/worker.py index f34916419..81a356640 100644 --- a/connector/queue/worker.py +++ b/connector/queue/worker.py @@ -323,6 +323,11 @@ def _notify_alive(self, session, worker): _logger.debug('Worker %s is alive on process %s', worker.uuid, os.getpid()) dbworker_obj = session.pool.get('queue.worker') + # at startup (especially when running tests), + # registry is not yet initialized, so we just skip + # the notify in such case + if not dbworker_obj: + return dbworker_obj._notify_alive(session.cr, session.uid, worker, @@ -330,6 +335,11 @@ def _notify_alive(self, session, worker): def _purge_dead_workers(self, session): dbworker_obj = session.pool.get('queue.worker') + # at startup (especially when running tests), + # registry is not yet initialized, so we just skip + # the notify in such case + if not dbworker_obj: + return dbworker_obj._purge_dead_workers(session.cr, session.uid, context=session.context) From f5d761bdd910a5df8dbe02efb002145628d78563 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Fri, 5 Jul 2013 15:45:02 +0200 Subject: [PATCH 9/9] [FIX] run 1 worker when the configuration option defines 0 worker --- connector/openerp-connector-worker | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/openerp-connector-worker b/connector/openerp-connector-worker index d57e9546c..297986c26 100755 --- a/connector/openerp-connector-worker +++ b/connector/openerp-connector-worker @@ -21,7 +21,7 @@ class Multicornnector(workers.Multicorn): def __init__(self, app): super(Multicornnector, self).__init__(app) self.address = ('0.0.0.0', 0) - self.population = config['workers'] + self.population = config['workers'] or 1 self.workers_connector = {} def process_spawn(self):