-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Direct answers #45
Merged
Merged
Direct answers #45
Changes from 2 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
2cf94c7
build new direct answer parser
JKlueber 615d0f4
added direct answer
JKlueber f3f11e8
- removed rank
JKlueber 45ee69d
changed xpaths List[str] to xpath str
JKlueber dabe86e
changed direct_answer to plural
JKlueber 2da7caa
added direct answers
JKlueber a7af589
dubugged import
JKlueber 9e62837
added priority
JKlueber 14ddc9a
removed "import_warc_direct_answers_parsers" from CLI
JKlueber File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,272 @@ | ||
from functools import cache | ||
from itertools import chain | ||
from typing import Iterable, Iterator | ||
from urllib.parse import urljoin | ||
from uuid import uuid5 | ||
|
||
from click import echo | ||
from elasticsearch_dsl import Search | ||
from elasticsearch_dsl.function import RandomScore | ||
from elasticsearch_dsl.query import FunctionScore, Term, RankFeature, Exists | ||
# noinspection PyProtectedMember | ||
from lxml.etree import _Element, tostring # nosec: B410 | ||
from tqdm.auto import tqdm | ||
from warc_s3 import WarcS3Store | ||
|
||
from archive_query_log.config import Config | ||
from archive_query_log.namespaces import NAMESPACE_WARC_DIRECT_ANSWER_PARSER, \ | ||
NAMESPACE_RESULT | ||
from archive_query_log.orm import Serp, InnerParser, InnerProviderId, \ | ||
WarcDirectAnswerParserType, WarcDirectAnswerParser, WarcLocation, DirectAnswer, \ | ||
Result, InnerSerp, DirectAnswerId, InnerDownloader | ||
from archive_query_log.parsers.warc import open_warc | ||
from archive_query_log.parsers.xml import parse_xml_tree, safe_xpath | ||
from archive_query_log.utils.es import safe_iter_scan, update_action | ||
from archive_query_log.utils.time import utc_now | ||
|
||
|
||
def add_warc_direct_answer_parser( | ||
config: Config, | ||
provider_id: str | None, | ||
url_pattern_regex: str | None, | ||
priority: float | None, | ||
parser_type: WarcDirectAnswerParserType, | ||
xpath: str | None, | ||
big_box_xpath: str | None, | ||
small_box_xpath: str | None, | ||
right_box_xpath: str | None, | ||
janheinrichmerker marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) -> None: | ||
if priority is not None and priority <= 0: | ||
raise ValueError("Priority must be strictly positive.") | ||
if parser_type == "xpath": | ||
if xpath is None: | ||
raise ValueError("No XPath given.") | ||
else: | ||
raise ValueError(f"Invalid parser type: {parser_type}") | ||
parser_id_components = ( | ||
provider_id if provider_id is not None else "", | ||
url_pattern_regex if url_pattern_regex is not None else "", | ||
str(priority) if priority is not None else "", | ||
) | ||
parser_id = str(uuid5( | ||
NAMESPACE_WARC_DIRECT_ANSWER_PARSER, | ||
":".join(parser_id_components), | ||
)) | ||
parser = WarcDirectAnswerParser( | ||
janheinrichmerker marked this conversation as resolved.
Show resolved
Hide resolved
|
||
id=parser_id, | ||
last_modified=utc_now(), | ||
provider=InnerProviderId(id=provider_id) if provider_id else None, | ||
url_pattern_regex=url_pattern_regex, | ||
priority=priority, | ||
parser_type=parser_type, | ||
xpath=xpath, | ||
big_box_xpath=big_box_xpath, | ||
small_box_xpath=small_box_xpath, | ||
right_box_xpath=right_box_xpath, | ||
) | ||
parser.save(using=config.es.client) | ||
|
||
|
||
def _parse_warc_direct_answer( | ||
parser: WarcDirectAnswerParser, | ||
serp_id: str, | ||
capture_url: str, | ||
warc_store: WarcS3Store, | ||
warc_location: WarcLocation, | ||
) -> list[DirectAnswer] | None: | ||
# Check if URL matches pattern. | ||
if (parser.url_pattern is not None and | ||
not parser.url_pattern.match(capture_url)): | ||
return None | ||
|
||
# Parse direct answer. | ||
if parser.parser_type == "xpath": | ||
if parser.xpath is None: | ||
raise ValueError("No XPath given.") | ||
with open_warc(warc_store, warc_location) as record: | ||
tree = parse_xml_tree(record) | ||
if tree is None: | ||
return None | ||
|
||
elements = safe_xpath(tree, parser.xpath, _Element) | ||
if len(elements) == 0: | ||
return None | ||
|
||
direct_answers = [] | ||
element: _Element | ||
for i, element in enumerate(elements): | ||
big_box: str | None = None | ||
if parser.big_box_xpath is not None: | ||
big_boxs = safe_xpath(element, parser.big_box_xpath, str) | ||
if len(big_boxs) > 0: | ||
big_box = big_boxs[0].strip() | ||
small_box: str | None = None | ||
if parser.small_box_xpath is not None: | ||
small_boxs = safe_xpath(element, parser.small_box_xpath, str) | ||
if len(small_boxs) > 0: | ||
small_box = small_boxs[0].strip() | ||
right_box: str | None = None | ||
if parser.right_box_xpath is not None: | ||
right_boxs = safe_xpath(element, parser.right_box_xpath, str) | ||
if len(right_boxs) > 0: | ||
right_box = right_boxs[0].strip() | ||
|
||
content: str = tostring( | ||
element, | ||
encoding=str, | ||
method="xml", | ||
pretty_print=False, | ||
with_tail=True, | ||
) | ||
direct_answer_id_components = ( | ||
serp_id, | ||
parser.id, | ||
str(hash(content)), | ||
str(i), | ||
) | ||
direct_answer_id = str(uuid5( | ||
NAMESPACE_RESULT, | ||
":".join(direct_answer_id_components), | ||
)) | ||
direct_answers.append(DirectAnswer( | ||
id=direct_answer_id, | ||
rank=i, | ||
content=content, | ||
big_box=big_box, | ||
small_box=small_box, | ||
right_box=right_box, | ||
)) | ||
return direct_answers | ||
else: | ||
raise ValueError(f"Unknown parser type: {parser.parser_type}") | ||
|
||
|
||
@cache | ||
def _warc_direct_answer_parsers( | ||
config: Config, | ||
provider_id: str, | ||
) -> list[WarcDirectAnswerParser]: | ||
parsers: Iterable[WarcDirectAnswerParser] = ( | ||
WarcDirectAnswerParser.search(using=config.es.client) | ||
.filter( | ||
~Exists(field="provider.id") | | ||
Term(provider__id=provider_id) | ||
) | ||
.query(RankFeature(field="priority", saturation={})) | ||
.scan() | ||
) | ||
parsers = safe_iter_scan(parsers) | ||
return list(parsers) | ||
|
||
|
||
def _parse_serp_warc_direct_answer_action( | ||
config: Config, | ||
serp: Serp, | ||
) -> Iterator[dict]: | ||
# Re-check if it can be parsed. | ||
if (serp.warc_location is None or | ||
serp.warc_location.file is None or | ||
serp.warc_location.offset is None or | ||
serp.warc_location.length is None): | ||
return | ||
|
||
# Re-check if parsing is necessary. | ||
if (serp.warc_direct_answer_parser is not None and | ||
serp.warc_direct_answer_parser.should_parse is not None and | ||
not serp.warc_direct_answer_parser.should_parse): | ||
return | ||
|
||
for parser in _warc_direct_answer_parsers(config, serp.provider.id): | ||
# Try to parse the snippets. | ||
warc_direct_answers = _parse_warc_direct_answer( | ||
parser=parser, | ||
serp_id=serp.id, | ||
capture_url=serp.capture.url, | ||
warc_store=config.s3.warc_store, | ||
warc_location=serp.warc_location, | ||
) | ||
if warc_direct_answers is None: | ||
# Parsing was not successful, e.g., URL pattern did not match. | ||
continue | ||
for direct_answer in warc_direct_answers: | ||
yield Result( | ||
id=direct_answer.id, | ||
last_modified=utc_now(), | ||
archive=serp.archive, | ||
provider=serp.provider, | ||
capture=serp.capture, | ||
serp=InnerSerp( | ||
id=serp.id, | ||
).to_dict(), | ||
direct_answer=direct_answer, | ||
direct_answer_parser=InnerParser( | ||
id=parser.id, | ||
should_parse=False, | ||
last_parsed=utc_now(), | ||
).to_dict(), | ||
warc_before_serp_downloader=InnerDownloader( | ||
should_download=True, | ||
).to_dict(), | ||
warc_after_serp_downloader=InnerDownloader( | ||
should_download=True, | ||
).to_dict(), | ||
).to_dict(include_meta=True) | ||
yield update_action( | ||
serp, | ||
warc_direct_answers=[ | ||
DirectAnswerId( | ||
id=direct_answer.id, | ||
rank=direct_answer.rank, | ||
) | ||
for direct_answer in warc_direct_answers | ||
], | ||
warc_direct_answers_parser=InnerParser( | ||
id=parser.id, | ||
should_parse=False, | ||
last_parsed=utc_now(), | ||
), | ||
) | ||
return | ||
yield update_action( | ||
serp, | ||
warc_direct_answer_parser=InnerParser( | ||
should_parse=False, | ||
last_parsed=utc_now(), | ||
), | ||
) | ||
return | ||
|
||
|
||
def parse_serps_warc_direct_answer(config: Config) -> None: | ||
Serp.index().refresh(using=config.es.client) | ||
changed_serps_search: Search = ( | ||
Serp.search(using=config.es.client) | ||
.filter( | ||
Exists(field="warc_location") & | ||
~Term(warc_direct_answer_parser__should_parse=False) | ||
) | ||
.query( | ||
RankFeature(field="archive.priority", saturation={}) | | ||
RankFeature(field="provider.priority", saturation={}) | | ||
FunctionScore(functions=[RandomScore()]) | ||
) | ||
) | ||
num_changed_serps = changed_serps_search.count() | ||
if num_changed_serps > 0: | ||
changed_serps: Iterable[Serp] = ( | ||
changed_serps_search | ||
.params(preserve_order=True) | ||
.scan() | ||
) | ||
changed_serps = safe_iter_scan(changed_serps) | ||
# noinspection PyTypeChecker | ||
changed_serps = tqdm( | ||
changed_serps, total=num_changed_serps, | ||
desc="Parsing WARC direct answer", unit="SERP") | ||
actions = chain.from_iterable( | ||
_parse_serp_warc_direct_answer_action(config, serp) | ||
for serp in changed_serps | ||
) | ||
config.es.bulk(actions) | ||
else: | ||
echo("No new/changed SERPs.") |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does a direct answer really have an interpretable "rank" (i.e., are the direct answers ordered in some way)?