Skip to content

Commit

Permalink
Merge pull request #176 from scrapinghub/kafka-codec-fix
Browse files Browse the repository at this point in the history
fixing value check
  • Loading branch information
sibiryakov authored Jul 22, 2016
2 parents 7dd6165 + 7dfa4a4 commit 2d55a38
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 17 deletions.
31 changes: 31 additions & 0 deletions docs/source/topics/frontera-settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,37 @@ Default: ``'frontera.contrib.backends.memory.FIFO'``
The :class:`Backend <frontera.core.components.Backend>` to be used by the frontier. For more info see
:ref:`Activating a backend <frontier-activating-backend>`.


.. setting:: BC_MIN_REQUESTS

BC_MIN_REQUESTS
---------------

Default: ``64``

Broad crawling queue get operation will keep retrying until specified number of requests is collected. Maximum number
of retries is hard-coded to 3.

.. setting:: BC_MIN_HOSTS

BC_MIN_HOSTS
------------

Default: ``24``

Keep retyring when getting requests from queue, until there are requests for specified minimum number of hosts
collected. Maximum number of retries is hard-coded and equals 3.

.. setting:: BC_MAX_REQUESTS_PER_HOST

BC_MAX_REQUESTS_PER_HOST
------------------------

Default:: ``128``

Don't include (if possible) batches of requests containing requests for specific host if there are already more then
specified count of maximum requests per host. This is a suggestion for broad crawling queue get algorithm.

.. setting:: CANONICAL_SOLVER

CANONICAL_SOLVER
Expand Down
6 changes: 6 additions & 0 deletions docs/source/topics/frontier-backends.rst
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,12 @@ tunning a block cache to fit states within one block for average size website. T
to achieve documents closeness within the same host. This function can be selected with :setting:`URL_FINGERPRINT_FUNCTION`
setting.

.. TODO: document details of block cache tuning,
BC* settings and queue get operation concept,
hbase tables schema and data flow
Queue exploration
shuffling with MR jobs
.. _FIFO: http://en.wikipedia.org/wiki/FIFO
.. _LIFO: http://en.wikipedia.org/wiki/LIFO_(computing)
.. _DFS: http://en.wikipedia.org/wiki/Depth-first_search
Expand Down
10 changes: 8 additions & 2 deletions frontera/contrib/backends/hbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,10 @@ def __init__(self, manager):
port = settings.get('HBASE_THRIFT_PORT')
hosts = settings.get('HBASE_THRIFT_HOST')
namespace = settings.get('HBASE_NAMESPACE')
self._min_requests = settings.get('BC_MIN_REQUESTS')
self._min_hosts = settings.get('BC_MIN_HOSTS')
self._max_requests_per_host = settings.get('BC_MAX_REQUESTS_PER_HOST')

self.queue_partitions = settings.get('SPIDER_FEED_PARTITIONS')
host = choice(hosts) if type(hosts) in [list, tuple] else hosts
kwargs = {
Expand Down Expand Up @@ -456,8 +460,10 @@ def get_next_requests(self, max_next_requests, **kwargs):
for partition_id in range(0, self.queue_partitions):
if partition_id not in partitions:
continue
results = self.queue.get_next_requests(max_next_requests, partition_id, min_requests=64,
min_hosts=24, max_requests_per_host=128)
results = self.queue.get_next_requests(max_next_requests, partition_id,
min_requests=self._min_requests,
min_hosts=self._min_hosts,
max_requests_per_host=self._max_requests_per_host)
next_pages.extend(results)
self.logger.debug("Got %d requests for partition id %d", len(results), partition_id)
return next_pages
2 changes: 1 addition & 1 deletion frontera/contrib/messagebus/kafkabus.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def __init__(self, settings):
if codec == 'gzip':
from kafka.protocol import CODEC_GZIP
self.codec = CODEC_GZIP
if not self.codec:
if self.codec is None:
raise NameError("Non-existent Kafka compression codec.")

self.conn = KafkaClient(server)
Expand Down
19 changes: 10 additions & 9 deletions frontera/core/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@

class ComponentsPipelineMixin(object):
def __init__(self, backend, middlewares=None, canonicalsolver=None, db_worker=False, strategy_worker=False):
self._logger_components = logging.getLogger("manager.components")

# Load middlewares
self._middlewares = self._load_middlewares(middlewares)

# Load canonical solver
self._logger.debug("Loading canonical url solver '%s'", canonicalsolver)
self._logger_components.debug("Loading canonical url solver '%s'", canonicalsolver)
self._canonicalsolver = self._load_object(canonicalsolver)
assert isinstance(self.canonicalsolver, CanonicalSolver), \
"canonical solver '%s' must subclass CanonicalSolver" % self.canonicalsolver.__class__.__name__

# Load backend
self._logger.debug("Loading backend '%s'", backend)
self._logger_components.debug("Loading backend '%s'", backend)
self._backend = self._load_backend(backend, db_worker, strategy_worker)

@property
Expand Down Expand Up @@ -67,14 +69,14 @@ def _load_middlewares(self, middleware_names):
# TO-DO: Use dict for middleware ordering
mws = []
for mw_name in middleware_names or []:
self._logger.debug("Loading middleware '%s'", mw_name)
self._logger_components.debug("Loading middleware '%s'", mw_name)
try:
mw = self._load_object(mw_name, silent=False)
assert isinstance(mw, Middleware), "middleware '%s' must subclass Middleware" % mw.__class__.__name__
if mw:
mws.append(mw)
except NotConfigured:
self._logger.warning("middleware '%s' disabled!", mw_name)
self._logger_components.warning("middleware '%s' disabled!", mw_name)

return mws

Expand All @@ -89,15 +91,14 @@ def _process_components(self, method_name, obj=None, return_classes=None, **kwar
if check_response:
return_obj = result
if check_response and obj and not return_obj:
self._logger.warning("Object '%s' filtered in '%s' by '%s'",
obj.__class__.__name__, method_name, component.__class__.__name__
)
self._logger_components.warning("Object '%s' filtered in '%s' by '%s'",
obj.__class__.__name__, method_name, component.__class__.__name__)
return
return return_obj

def _process_component(self, component, method_name, component_category, obj, return_classes, **kwargs):
self._logger.debug("processing '%s' '%s.%s' %s",
method_name, component_category, component.__class__.__name__, obj)
self._logger_components.debug("processing '%s' '%s.%s' %s",
method_name, component_category, component.__class__.__name__, obj)
return_obj = getattr(component, method_name)(*([obj] if obj else []), **kwargs)
assert return_obj is None or isinstance(return_obj, return_classes), \
"%s '%s.%s' must return None or %s, Got '%s'" % \
Expand Down
3 changes: 3 additions & 0 deletions frontera/settings/default_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

AUTO_START = True
BACKEND = 'frontera.contrib.backends.memory.FIFO'
BC_MIN_REQUESTS = 64
BC_MIN_HOSTS = 24
BC_MAX_REQUESTS_PER_HOST = 128
CANONICAL_SOLVER = 'frontera.contrib.canonicalsolvers.Basic'
DELAY_ON_EMPTY = 5.0
DOMAIN_FINGERPRINT_FUNCTION = 'frontera.utils.fingerprint.sha1'
Expand Down
4 changes: 2 additions & 2 deletions frontera/worker/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,12 @@ def consume_incoming(self, *args, **kwargs):
if type == 'page_crawled':
_, response, links = msg
logger.debug("Page crawled %s", response.url)
if response.meta['jid'] != self.job_id:
if 'jid' not in response.meta or response.meta['jid'] != self.job_id:
continue
self._backend.page_crawled(response, links)
if type == 'request_error':
_, request, error = msg
if request.meta['jid'] != self.job_id:
if 'jid' not in request.meta or request.meta['jid'] != self.job_id:
continue
logger.debug("Request error %s", request.url)
self._backend.request_error(request, error)
Expand Down
8 changes: 5 additions & 3 deletions frontera/worker/strategy.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
from time import asctime
import logging
from traceback import format_stack
from traceback import format_stack, format_tb
from signal import signal, SIGUSR1
from logging.config import fileConfig
from argparse import ArgumentParser
Expand Down Expand Up @@ -170,14 +170,14 @@ def work(self):

if type == 'page_crawled':
_, response, links = msg
if response.meta['jid'] != self.job_id:
if 'jid' not in response.meta or response.meta['jid'] != self.job_id:
continue
self.on_page_crawled(response, links)
continue

if type == 'request_error':
_, request, error = msg
if request.meta['jid'] != self.job_id:
if 'jid' not in request.meta or request.meta['jid'] != self.job_id:
continue
self.on_request_error(request, error)
continue
Expand All @@ -203,6 +203,8 @@ def work(self):
def run(self):
def errback(failure):
logger.exception(failure.value)
if failure.frames:
logger.critical(str("").join(format_tb(failure.getTracebackObject())))
self.task.start(interval=0).addErrback(errback)

def debug(sig, frame):
Expand Down

0 comments on commit 2d55a38

Please sign in to comment.