Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Direct answers #45

Merged
merged 9 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions archive_query_log/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,18 @@ class Snippet(SnippetId):
text: str | None = Text()


class DirectAnswerId(InnerDocument):
id: str = Keyword()
rank: int = Integer()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does a direct answer really have an interpretable "rank" (i.e., are the direct answers ordered in some way)?



class DirectAnswer(DirectAnswerId):
content: str = Text()
big_box: str | None = Keyword()
small_box: str | None = Text()
right_box: str | None = Text()
janheinrichmerker marked this conversation as resolved.
Show resolved Hide resolved


class Serp(BaseDocument):
archive: InnerArchive = Object(InnerArchive)
provider: InnerProvider = Object(InnerProvider)
Expand All @@ -208,6 +220,8 @@ class Serp(BaseDocument):
warc_query_parser: InnerParser | None = Object(InnerParser)
warc_snippets: list[SnippetId] | None = Nested(SnippetId)
warc_snippets_parser: InnerParser | None = Object(InnerParser)
warc_direct_answer: list[SnippetId] | None = Nested(SnippetId)
janheinrichmerker marked this conversation as resolved.
Show resolved Hide resolved
warc_direct_answer_parser: InnerParser | None = Object(InnerParser)

# rendered_warc_location: WarcLocation | None = Object(WarcLocation)
# rendered_warc_downloader: InnerDownloader | None = (
Expand Down Expand Up @@ -437,6 +451,35 @@ class Index:
}


WarcDirectAnswerParserType = Literal[
"xpath",
]


class WarcDirectAnswerParser(BaseDocument):
provider: InnerProviderId | None = Object(InnerProviderId)
url_pattern_regex: str | None = Keyword()
priority: float | None = RankFeature(positive_score_impact=True)
parser_type: WarcDirectAnswerParserType = Keyword()
xpath: str | None = Keyword()
url_xpath: str | None = Keyword()
title_xpath: str | None = Keyword()
text_xpath: str | None = Keyword()
janheinrichmerker marked this conversation as resolved.
Show resolved Hide resolved

@cached_property
def url_pattern(self) -> Pattern | None:
if self.url_pattern_regex is None:
raise ValueError("No URL pattern regex.")
return pattern(self.url_pattern_regex)

class Index:
name = "aql_warc_direct_answer_parsers"
settings = {
"number_of_shards": 1,
"number_of_replicas": 2,
}


WarcMainContentParserType = Literal[
"resiliparse",
]
Expand Down
272 changes: 272 additions & 0 deletions archive_query_log/parsers/warc_direct_answers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
from functools import cache
from itertools import chain
from typing import Iterable, Iterator
from urllib.parse import urljoin
from uuid import uuid5

from click import echo
from elasticsearch_dsl import Search
from elasticsearch_dsl.function import RandomScore
from elasticsearch_dsl.query import FunctionScore, Term, RankFeature, Exists
# noinspection PyProtectedMember
from lxml.etree import _Element, tostring # nosec: B410
from tqdm.auto import tqdm
from warc_s3 import WarcS3Store

from archive_query_log.config import Config
from archive_query_log.namespaces import NAMESPACE_WARC_DIRECT_ANSWER_PARSER, \
NAMESPACE_RESULT
from archive_query_log.orm import Serp, InnerParser, InnerProviderId, \
WarcDirectAnswerParserType, WarcDirectAnswerParser, WarcLocation, DirectAnswer, \
Result, InnerSerp, DirectAnswerId, InnerDownloader
from archive_query_log.parsers.warc import open_warc
from archive_query_log.parsers.xml import parse_xml_tree, safe_xpath
from archive_query_log.utils.es import safe_iter_scan, update_action
from archive_query_log.utils.time import utc_now


def add_warc_direct_answer_parser(
config: Config,
provider_id: str | None,
url_pattern_regex: str | None,
priority: float | None,
parser_type: WarcDirectAnswerParserType,
xpath: str | None,
big_box_xpath: str | None,
small_box_xpath: str | None,
right_box_xpath: str | None,
janheinrichmerker marked this conversation as resolved.
Show resolved Hide resolved
) -> None:
if priority is not None and priority <= 0:
raise ValueError("Priority must be strictly positive.")
if parser_type == "xpath":
if xpath is None:
raise ValueError("No XPath given.")
else:
raise ValueError(f"Invalid parser type: {parser_type}")
parser_id_components = (
provider_id if provider_id is not None else "",
url_pattern_regex if url_pattern_regex is not None else "",
str(priority) if priority is not None else "",
)
parser_id = str(uuid5(
NAMESPACE_WARC_DIRECT_ANSWER_PARSER,
":".join(parser_id_components),
))
parser = WarcDirectAnswerParser(
janheinrichmerker marked this conversation as resolved.
Show resolved Hide resolved
id=parser_id,
last_modified=utc_now(),
provider=InnerProviderId(id=provider_id) if provider_id else None,
url_pattern_regex=url_pattern_regex,
priority=priority,
parser_type=parser_type,
xpath=xpath,
big_box_xpath=big_box_xpath,
small_box_xpath=small_box_xpath,
right_box_xpath=right_box_xpath,
)
parser.save(using=config.es.client)


def _parse_warc_direct_answer(
parser: WarcDirectAnswerParser,
serp_id: str,
capture_url: str,
warc_store: WarcS3Store,
warc_location: WarcLocation,
) -> list[DirectAnswer] | None:
# Check if URL matches pattern.
if (parser.url_pattern is not None and
not parser.url_pattern.match(capture_url)):
return None

# Parse direct answer.
if parser.parser_type == "xpath":
if parser.xpath is None:
raise ValueError("No XPath given.")
with open_warc(warc_store, warc_location) as record:
tree = parse_xml_tree(record)
if tree is None:
return None

elements = safe_xpath(tree, parser.xpath, _Element)
if len(elements) == 0:
return None

direct_answers = []
element: _Element
for i, element in enumerate(elements):
big_box: str | None = None
if parser.big_box_xpath is not None:
big_boxs = safe_xpath(element, parser.big_box_xpath, str)
if len(big_boxs) > 0:
big_box = big_boxs[0].strip()
small_box: str | None = None
if parser.small_box_xpath is not None:
small_boxs = safe_xpath(element, parser.small_box_xpath, str)
if len(small_boxs) > 0:
small_box = small_boxs[0].strip()
right_box: str | None = None
if parser.right_box_xpath is not None:
right_boxs = safe_xpath(element, parser.right_box_xpath, str)
if len(right_boxs) > 0:
right_box = right_boxs[0].strip()

content: str = tostring(
element,
encoding=str,
method="xml",
pretty_print=False,
with_tail=True,
)
direct_answer_id_components = (
serp_id,
parser.id,
str(hash(content)),
str(i),
)
direct_answer_id = str(uuid5(
NAMESPACE_RESULT,
":".join(direct_answer_id_components),
))
direct_answers.append(DirectAnswer(
id=direct_answer_id,
rank=i,
content=content,
big_box=big_box,
small_box=small_box,
right_box=right_box,
))
return direct_answers
else:
raise ValueError(f"Unknown parser type: {parser.parser_type}")


@cache
def _warc_direct_answer_parsers(
config: Config,
provider_id: str,
) -> list[WarcDirectAnswerParser]:
parsers: Iterable[WarcDirectAnswerParser] = (
WarcDirectAnswerParser.search(using=config.es.client)
.filter(
~Exists(field="provider.id") |
Term(provider__id=provider_id)
)
.query(RankFeature(field="priority", saturation={}))
.scan()
)
parsers = safe_iter_scan(parsers)
return list(parsers)


def _parse_serp_warc_direct_answer_action(
config: Config,
serp: Serp,
) -> Iterator[dict]:
# Re-check if it can be parsed.
if (serp.warc_location is None or
serp.warc_location.file is None or
serp.warc_location.offset is None or
serp.warc_location.length is None):
return

# Re-check if parsing is necessary.
if (serp.warc_direct_answer_parser is not None and
serp.warc_direct_answer_parser.should_parse is not None and
not serp.warc_direct_answer_parser.should_parse):
return

for parser in _warc_direct_answer_parsers(config, serp.provider.id):
# Try to parse the snippets.
warc_direct_answers = _parse_warc_direct_answer(
parser=parser,
serp_id=serp.id,
capture_url=serp.capture.url,
warc_store=config.s3.warc_store,
warc_location=serp.warc_location,
)
if warc_direct_answers is None:
# Parsing was not successful, e.g., URL pattern did not match.
continue
for direct_answer in warc_direct_answers:
yield Result(
id=direct_answer.id,
last_modified=utc_now(),
archive=serp.archive,
provider=serp.provider,
capture=serp.capture,
serp=InnerSerp(
id=serp.id,
).to_dict(),
direct_answer=direct_answer,
direct_answer_parser=InnerParser(
id=parser.id,
should_parse=False,
last_parsed=utc_now(),
).to_dict(),
warc_before_serp_downloader=InnerDownloader(
should_download=True,
).to_dict(),
warc_after_serp_downloader=InnerDownloader(
should_download=True,
).to_dict(),
).to_dict(include_meta=True)
yield update_action(
serp,
warc_direct_answers=[
DirectAnswerId(
id=direct_answer.id,
rank=direct_answer.rank,
)
for direct_answer in warc_direct_answers
],
warc_direct_answers_parser=InnerParser(
id=parser.id,
should_parse=False,
last_parsed=utc_now(),
),
)
return
yield update_action(
serp,
warc_direct_answer_parser=InnerParser(
should_parse=False,
last_parsed=utc_now(),
),
)
return


def parse_serps_warc_direct_answer(config: Config) -> None:
Serp.index().refresh(using=config.es.client)
changed_serps_search: Search = (
Serp.search(using=config.es.client)
.filter(
Exists(field="warc_location") &
~Term(warc_direct_answer_parser__should_parse=False)
)
.query(
RankFeature(field="archive.priority", saturation={}) |
RankFeature(field="provider.priority", saturation={}) |
FunctionScore(functions=[RandomScore()])
)
)
num_changed_serps = changed_serps_search.count()
if num_changed_serps > 0:
changed_serps: Iterable[Serp] = (
changed_serps_search
.params(preserve_order=True)
.scan()
)
changed_serps = safe_iter_scan(changed_serps)
# noinspection PyTypeChecker
changed_serps = tqdm(
changed_serps, total=num_changed_serps,
desc="Parsing WARC direct answer", unit="SERP")
actions = chain.from_iterable(
_parse_serp_warc_direct_answer_action(config, serp)
for serp in changed_serps
)
config.es.bulk(actions)
else:
echo("No new/changed SERPs.")
Loading