Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster kafka db worker doesnt recognize partitions #360

Closed
danmsf opened this issue Jan 13, 2019 · 16 comments
Closed

cluster kafka db worker doesnt recognize partitions #360

danmsf opened this issue Jan 13, 2019 · 16 comments

Comments

@danmsf
Copy link

danmsf commented Jan 13, 2019

Hi,
Im trying to use cluster configuration. I've created topics in kafka and have it up and running.
Im running into trouble starting the database worker.
Tried:
python -m frontera.worker.db --config config.dbw --no-incoming --partitions 0,1
got an error 0,1 not recognized,
tried:
python -m frontera.worker.db --config config.dbw --no-incoming --partitions 0
I was getting the same issue as in #359, but somehow that stopped happening.

Now I'm getting: that kafka partitions are not recognized or iterrable, see error.
Im using python 3.6 and the frontera from the repo (FYI qzm and cachetools still needed to be installed manually).
Any ideas?

File "/usr/lib64/python3.6/runpy.py", line 193, in _run_module_as_main
"main", mod_spec)
File "/usr/lib64/python3.6/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/usr/lib/python3.6/dist-packages/frontera/worker/db.py", line 246, in
args.no_scoring, partitions=args.partitions)
File "/usr/lib/python3.6/dist-packages/frontera/worker/stats.py", line 22, in init
super(StatsExportMixin, self).init(settings, *args, **kwargs)
File "/usr/lib/python3.6/dist-packages/frontera/worker/db.py", line 115, in init
self.slot = Slot(self, settings, **slot_kwargs)
File "/usr/lib/python3.6/dist-packages/frontera/worker/db.py", line 46, in init
self.components = self._load_components(worker, settings, **kwargs)
File "/usr/lib/python3.6/dist-packages/frontera/worker/db.py", line 55, in _load_components
component = cls(worker, settings, stop_event=self.stop_event, **kwargs)
File "/usr/lib/python3.6/dist-packages/frontera/worker/components/scoring_consumer.py", line 24, in init
self.scoring_log_consumer = scoring_log.consumer()
File "/usr/lib/python3.6/dist-packages/frontera/contrib/messagebus/kafkabus.py", line 219, in consumer
return Consumer(self._location, self._enable_ssl, self._cert_path, self._topic, self._group, partition_id=None)
File "/usr/lib/python3.6/dist-packages/frontera/contrib/messagebus/kafkabus.py", line 60, in init
self._partitions = [TopicPartition(self._topic, pid) for pid in self._consumer.partitions_for_topic(self._topic)]

@danmsf
Copy link
Author

danmsf commented Jan 13, 2019

I commented out get_stats() just to see if it would work. I can get data base workers to work; but then I get a different error starting starategy workers.
Im closing until #359 is resolved - mabye its connected

@danmsf danmsf closed this as completed Jan 13, 2019
@sibiryakov
Copy link
Member

Hey @danmsf you've put the stack trace without exception preceding it, so I can't identify what is the issue. The partitions are meant to be space separated, not comma. What is that different error from strategy workers?

@danmsf
Copy link
Author

danmsf commented Jan 14, 2019

I was getting an Error that said:

INFO:manager:--------------------------------------------------------------------------------
INFO:manager:Starting Frontier Manager...
INFO:hbase.backend:Connecting to localhost:9090 thrift server.
Traceback (most recent call last):
File "/usr/lib64/python3.6/runpy.py", line 193, in _run_module_as_main
"main", mod_spec)
File "/usr/lib64/python3.6/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/usr/lib/python3.6/dist-packages/frontera/worker/strategy.py", line 388, in
worker = StrategyWorker(settings, is_add_seeds_mode)
File "/usr/lib/python3.6/dist-packages/frontera/worker/stats.py", line 22, in init
super(StatsExportMixin, self).init(settings, *args, **kwargs)
File "/usr/lib/python3.6/dist-packages/frontera/worker/strategy.py", line 166, in init
manager = WorkerFrontierManager.from_settings(settings, strategy_worker=True, scoring_stream=self.update_score)
File "/usr/lib/python3.6/dist-packages/frontera/core/manager.py", line 691, in from_settings
return WorkerFrontierManager(**kwargs)
File "/usr/lib/python3.6/dist-packages/frontera/core/manager.py", line 657, in init
db_worker=db_worker, strategy_worker=strategy_worker)
File "/usr/lib/python3.6/dist-packages/frontera/core/manager.py", line 157, in init
super(StrategyComponentsPipelineMixin, self).init(backend, **kwargs)
File "/usr/lib/python3.6/dist-packages/frontera/core/manager.py", line 87, in init
BackendMixin.init(self, backend, db_worker, strategy_worker)
File "/usr/lib/python3.6/dist-packages/frontera/core/manager.py", line 21, in init
self._backend.frontier_start()
File "/usr/lib/python3.6/dist-packages/frontera/contrib/backends/hbase/init.py", line 564, in frontier_start
if component:
TypeError: 'NoneType' object is not callable
INFO:kafka.producer.kafka:Closing the Kafka producer with 0 secs timeout.

I changed /usr/lib/python3.6/dist-packages/frontera/contrib/backends/hbase/init.py to:
"if component is None: "
and that allowed me to start the strategy worker, but now Im running into an issue feeding the seeds, telling me:

INFO:manager:Frontier Manager Started!
INFO:manager:--------------------------------------------------------------------------------
Traceback (most recent call last):
File "/usr/lib64/python3.6/runpy.py", line 193, in _run_module_as_main
"main", mod_spec)
File "/usr/lib64/python3.6/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/usr/lib/python3.6/dist-packages/frontera/utils/add_seeds.py", line 45, in
run_add_seeds(settings, args.seeds_file)
File "/usr/lib/python3.6/dist-packages/frontera/utils/add_seeds.py", line 20, in run_add_seeds
manager.add_seeds(fh)
File "/usr/lib/python3.6/dist-packages/frontera/core/manager.py", line 495, in add_seeds
self.strategy.read_seeds(seeds_file)
File "/usr/lib/python3.6/dist-packages/frontera/strategy/basic.py", line 10, in read_seeds
self.schedule(r)
File "/usr/lib/python3.6/dist-packages/frontera/strategy/init.py", line 122, in schedule
self._scheduled_stream.send(request, score, dont_queue)
File "/usr/lib/python3.6/dist-packages/frontera/core/manager.py", line 798, in send
self._queue.schedule([(request.meta[b'fingerprint'], score, request, not dont_queue)])
AttributeError: 'NoneType' object has no attribute 'schedule'

so that probably was not a good fix, and mabye my settings are off?
any ideas?

@danmsf
Copy link
Author

danmsf commented Jan 14, 2019

Also i would update the docs to say:
'python -m frontera.worker.db --config [db worker config module] --no-incoming --partitions 0 1'

and in the common module script:
SCORING_TOPIC = 'frontier-score'
instead of
SCORING_TOPIC = 'frontier-scoring'

(I made a 'frontier-scoring' topic and lost some time figuring that out... its mentioned in the kafka topic creation correctly)

@danmsf danmsf reopened this Jan 14, 2019
@sibiryakov
Copy link
Member

hi @danmsf there is probably an issue with configuration. Have you specified BACKEND properly?

@sibiryakov
Copy link
Member

post your configs here, pls

@danmsf
Copy link
Author

danmsf commented Jan 17, 2019

I copy pasted from the docs - so everything is the same as here: https://frontera.readthedocs.io/en/latest/topics/cluster-setup.html .

in spider module:
BACKEND = 'frontera.contrib.backends.remote.messagebus.MessageBusBackend'
in worker module:
BACKEND = 'frontera.contrib.backends.hbase.HBaseBackend'
just for good measure added:
HBASE_DROP_ALL_TABLES = True
HBASE_THRIFT_PORT = 9090
HBASE_THRIFT_HOST = 'localhost'
HBASE_METADATA_TABLE = 'metadata'
HBASE_QUEUE_TABLE = 'queue'
MESSAGE_BUS = 'frontera.contrib.messagebus.kafkabus.MessageBus'

in common module:
changed topic to 'frontier-score'
in strategy worker module chose strategy: 'frontera.strategy.basic.BasicCrawlingStrategy'

running on:
linux (aws emr)
Python 3.6
kafka 2.1.0
hbase: 1.4.8
zookeeper: 3.4.13

And i get the NoneType not callable (first error) error above when starting the strategy worker

@danmsf
Copy link
Author

danmsf commented Jan 17, 2019

If it helps to point me in the right direction, I put a print() in the init script:
'''
def frontier_start(self):
for component in [self.metadata, self.queue, self.states, self.domain_metadata]:
print(component)
if component:
component.frontier_start()
'''

and the output is:

INFO:manager:--------------------------------------------------------------------------------
INFO:manager:Starting Frontier Manager...
INFO:hbase.backend:Connecting to localhost:9090 thrift server.
None
None
<frontera.contrib.backends.hbase.HBaseState object at 0x7f46801f6550>
DomainCache([], maxsize=1000, currsize=0)
Traceback (most recent call last):
.... error.....

@sibiryakov
Copy link
Member

No idea what's happening. This code shouldn't execute any calls to component if statement if component is resolved to False. There is a little chance that interpreter is somehow modified in emr. Could you try in the Python interactive mode this:

x = [0, 1, 2, None]
for c in x:
    if c:
        print(c, type(c))

if None is handled correctly, then please specify your shell command.

@danmsf
Copy link
Author

danmsf commented Jan 19, 2019

That works fine in python. My shell commands are:
Starting the DB worker (this works):
python -m frontera.worker.db --config multicluster.config.dbw --no-incoming --partitions 0 1

This to start the Strategy worker (this is where it gets stuck):
python -m frontera.worker.strategy --config multicluster.config.sw --partition-id 0

I think its getting stuck on the 'DomainCache([], maxsize=1000, currsize=0)' object trying to start it.
I am using cachetools version 2.1.0
Thanks for the help

@danmsf
Copy link
Author

danmsf commented Jan 20, 2019

I also tried instantiating the DomainCache object in shell:
d = DomainCache(connection = con, table_name = 'crawler:domain_metadata')
if d:
print(d)
and i get a NoneType error even though if i do:
type(d) I get a class of DomainCache...
I also made sure the connection to Hbase is working in the code (was able to print the list of tables...)
What is the first attribute of DomainCache([],..) supposed to look like?

@sibiryakov
Copy link
Member

The problem is probably connected with how Python evaluates object in if statement https://docs.python.org/3.6/reference/datamodel.html#object.__bool__

@danmsf
Copy link
Author

danmsf commented Jan 22, 2019

I found this helpful:
https://stackoverflow.com/q/20364414
I didnt have a chance to check it yet but the gist of it is that the class.__eq__ method that DomainCache inherited from Cache (inherited from collections.MutableMapping .... ) implemented an equality based on a method of the class (I dont know which) without first checking if it has that method... so whats going on is python is trying to checking something like:
DomainCache.methodA==None.methodA
and thats why its returning NoneType not callable (None doesnt have methodA).

If this is the case a local fix could be overloading the DomainCache.__eq__ method to include a hasattr() assertion...
But if Im the only one getting this for some reason maybe Im the exception :)
Thanks

@FrancoAlmonacid
Copy link

Hello,

Did someone find a solution for this problem? I have the exact same error which I fixed changing the following line:

if component is not None:

I'm not sure yet what is causing it so any comment would help :)

@ghost
Copy link

ghost commented Jun 8, 2019

@danmsf Hi, I have face a similar kafka problem when start db worker. However, i didnt have any issue about the get_stat() that you have mentioned at #359 .

59094202-3caafc00-8948-11e9-9e30-cb47d0206c51

Can you share with me, how did you solved this problem?
I have opened an issue at #371 .

Thanks in advance!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants