Skip to content

Commit

Permalink
Merge pull request #258 from singnet/imp/205-custom-template-index-pa…
Browse files Browse the repository at this point in the history
…ttern

[#205] New custom schema for redis index
  • Loading branch information
eddiebrissow authored Dec 11, 2024
2 parents c374929 + df52907 commit 579d187
Show file tree
Hide file tree
Showing 5 changed files with 878 additions and 92 deletions.
199 changes: 156 additions & 43 deletions hyperon_das_atomdb/adapters/redis_mongo_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@

import base64
import collections
import itertools
import pickle
import re
import sys
from copy import deepcopy
from enum import Enum
Expand Down Expand Up @@ -179,6 +181,7 @@ def __init__(self, **kwargs: Optional[dict[str, Any]]) -> None:
"""Initialize an instance of a custom class with Redis and MongoDB connections."""
super().__init__()
self.database_name = "das"
self.max_pos_size_custom_index_template = 4

self._setup_databases(**kwargs)

Expand All @@ -188,13 +191,16 @@ def __init__(self, **kwargs: Optional[dict[str, Any]]) -> None:
(MongoCollectionNames.ATOMS, self.mongo_atoms_collection),
(MongoCollectionNames.ATOM_TYPES, self.mongo_types_collection),
]
self.pattern_index_templates: dict[str, list[DocumentT]] | None = None
self.pattern_index_templates: list[dict[str, Any]] | None = None
self.pattern_templates: list | None = None
self.mongo_das_config_collection: Collection | None = None
if MongoCollectionNames.DAS_CONFIG in self.mongo_db.list_collection_names():
self.mongo_das_config_collection = self.mongo_db.get_collection(
MongoCollectionNames.DAS_CONFIG
)
if MongoCollectionNames.DAS_CONFIG not in self.mongo_db.list_collection_names():
self.mongo_db.create_collection(MongoCollectionNames.DAS_CONFIG)

self.mongo_das_config_collection = self.mongo_db.get_collection(
MongoCollectionNames.DAS_CONFIG
)
self._setup_indexes(kwargs)
self.wildcard_hash = ExpressionHasher.compute_hash(WILDCARD)
self.typedef_mark_hash = ExpressionHasher.compute_hash(":")
self.typedef_base_type_hash = ExpressionHasher.compute_hash("Type")
Expand All @@ -217,7 +223,6 @@ def __init__(self, **kwargs: Optional[dict[str, Any]]) -> None:
}
self.mongo_bulk_insertion_limit = 100000
self.max_mongo_db_document_size = 16000000
self._setup_indexes()
logger().info("Database setup finished")

def _setup_databases(self, **kwargs) -> None:
Expand Down Expand Up @@ -377,35 +382,129 @@ def _connection_redis(
else:
return Redis(**redis_connection) # type: ignore

def _setup_indexes(self) -> None:
"""
Set up the default and custom pattern index templates for the database.
def _validate_index_templates(self, templates):
validator = {
"field": r"(named_type|targets\[\d+\])",
"value": r".+",
"positions": r"\[(\d+(,\s*\d+)*)?\]",
"arity": r"[0-9]+",
}
for template in templates:
if not isinstance(template, dict):
raise ValueError("Templates values must be a dict")
for k in template.keys():
if re.search(validator[k], str(template[k])) is None:
raise ValueError(f"Value '{template[k]}' is not supported in '{k}'.")

def _save_pattern_index(self, pattern_index):
self._validate_index_templates(pattern_index)
if self.mongo_das_config_collection is not None:
self.mongo_das_config_collection.replace_one(
{"_id": "pattern_index_templates"},
{"_id": "pattern_index_templates", "templates": pattern_index},
upsert=True,
)
self.pattern_templates = pattern_index

def _load_pattern_index(self, options: dict[str, Any] | None) -> None:
"""
This method initializes the default pattern index templates based on various
combinations of named type and selected positions. If the DAS_CONFIG collection
exists in the MongoDB database, it retrieves the custom pattern index templates
from the collection. Otherwise, it sets the pattern index templates to None.
Additionally, it creates a field index for node names.
combinations of named type and selected positions. It retrieves the custom
pattern index templates from the collection the DAS_CONFIG collection in the
MongoDB database if exists. Otherwise, it sets the default pattern index templates
and save it in the MongoDB database.
Args:
options (dict | None): Dict containing the key 'pattern_index_templates', a list of
templates.
"""
self.default_pattern_index_templates = []
for named_type in [True, False]:
for pos0 in [True, False]:
for pos1 in [True, False]:
for pos2 in [True, False]:
if named_type and pos0 and pos1 and pos2:
# not a pattern but an actual atom
continue
template = {
FieldNames.TYPE_NAME: named_type,
"selected_positions": [
i for i, pos in enumerate([pos0, pos1, pos2]) if pos
],
}
self.default_pattern_index_templates.append(template)
default_templates = [
{"field": "named_type", "value": "*", "positions": [0, 1, 2], "arity": 3}
]
user_templates = None
found = None
if options is not None:
user_templates = options.get("pattern_index_templates", None)

if self.mongo_das_config_collection is not None:
found = self.mongo_das_config_collection.find_one({"_id": "pattern_index_templates"})
self.pattern_index_templates = found.get("templates", None) if found else None

if found is not None:
self.pattern_templates = found.get("templates", None) if found else None
if self.pattern_templates is not None:
if user_templates is not None and user_templates != self.pattern_templates:
raise ValueError(
"'pattern_index_templates' value doesn't match with found on database"
)
else:
self._save_pattern_index(user_templates if user_templates else default_templates)

def _validate_template_index_and_get_position(self, template):
if not isinstance(template["arity"], int) or template["arity"] < 0:
raise ValueError("'arity' must be an integer greater than or equal to zero.")

if len(template["positions"]) > self.max_pos_size_custom_index_template:
raise ValueError(
f"'positions' array should be less than {self.max_pos_size_custom_index_template}."
)

if any(pos >= template["arity"] for pos in template["positions"]):
raise ValueError("'positions' parameter must be in range of the arity.")

if template["field"] != "named_type" and (
found := re.search(r"targets\[(\d+)]", template["field"])
):
target_pos = int(found.group(1))
if target_pos >= template["arity"]:
raise ValueError("'target[]' index must be in range of arity.")
else:
target_pos = None

return target_pos

def _setup_indexes(self, options: dict[str, Any] | None) -> None:
"""
This method reads the template list and generate the index templates.
Additionally, it creates a field index for node names.
Args:
options (dict | None): Dict containing the key 'pattern_index_templates', a list of
templates.
"""
self._load_pattern_index(options)
if not self.pattern_templates:
raise ValueError("Index not loaded")

self.pattern_index_templates = []
for template in self.pattern_templates:
is_named_type = template["field"] == "named_type"
p_size = len(template["positions"])
arity = template["arity"]
i_size = p_size + 1
is_wild_card = template["value"] == "*"
target_pos = self._validate_template_index_and_get_position(template)

values = itertools.product([True, False], repeat=i_size)

for v in values:
if is_wild_card and all(v) and arity == p_size:
continue
if p_size == 0 and not is_wild_card:
continue
if not is_wild_card and not v[0] and is_named_type:
continue
t = {
FieldNames.TYPE_NAME: v[0]
if is_wild_card or not is_named_type
else ExpressionHasher.named_type_hash(template["value"]),
"target_position": target_pos,
"target_value": None if is_named_type else template["value"],
"selected_positions": [
template["positions"][i] for i, pos in enumerate(v[1:]) if pos
],
}
self.pattern_index_templates.append(t)
# NOTE creating index for name search
self.create_field_index("node", fields=["name"])

Expand Down Expand Up @@ -844,7 +943,7 @@ def _get_and_delete_links_by_handles(self, handles: HandleListT) -> list[Documen
@staticmethod
def _apply_index_template(
template: dict[str, Any], named_type: str, targets: HandleListT, arity: int
) -> str:
) -> str | None:
"""
Apply the index template to generate a Redis key.
Expand All @@ -862,10 +961,26 @@ def _apply_index_template(
Returns:
str: The generated Redis key after applying the index template.
"""
key = [WILDCARD] if template[FieldNames.TYPE_NAME] else [named_type]
key = None
if isinstance(template[FieldNames.TYPE_NAME], bool):
key = [WILDCARD] if template[FieldNames.TYPE_NAME] else [named_type]
else:
if named_type == template[FieldNames.TYPE_NAME]:
key = [template[FieldNames.TYPE_NAME]]

if key is None:
return None

target_selected_pos = template["selected_positions"]
for cursor in range(arity):
key.append(WILDCARD if cursor in target_selected_pos else targets[cursor])
if template["target_position"] is not None and len(targets) > template["target_position"]:
if targets[template["target_position"]] == template["target_value"]:
for cursor in range(arity):
key.append(WILDCARD if cursor in target_selected_pos else targets[cursor])
else:
return None
else:
for cursor in range(arity):
key.append(WILDCARD if cursor in target_selected_pos else targets[cursor])
return _build_redis_key(KeyPrefix.PATTERNS, ExpressionHasher.composite_hash(key))

def _retrieve_incoming_set(self, handle: str, **kwargs) -> HandleSetT:
Expand Down Expand Up @@ -1123,14 +1238,9 @@ def _update_link_index(self, document: DocumentT, **kwargs) -> None:
targets: HandleListT = self._get_document_keys(document)
targets_str: str = "".join(targets)
arity: int = len(targets)
named_type: str = document[FieldNames.TYPE_NAME]
named_type_hash: str = document[FieldNames.TYPE_NAME_HASH]

index_templates: list[dict[str, Any]]
if self.pattern_index_templates:
index_templates = self.pattern_index_templates.get(named_type, [])
else:
index_templates = self.default_pattern_index_templates
index_templates = self.pattern_index_templates or []

if kwargs.get("delete_atom", False):
links_handle = self._retrieve_and_delete_incoming_set(handle)
Expand All @@ -1153,7 +1263,8 @@ def _update_link_index(self, document: DocumentT, **kwargs) -> None:

for template in index_templates:
key = self._apply_index_template(template, named_type_hash, targets, arity)
self.redis.srem(key, handle)
if key:
self.redis.srem(key, handle)
else:
incoming_buffer: dict[str, HandleListT] = {}
key = _build_redis_key(KeyPrefix.OUTGOING_SET, handle)
Expand All @@ -1175,7 +1286,8 @@ def _update_link_index(self, document: DocumentT, **kwargs) -> None:

for template in index_templates:
key = self._apply_index_template(template, named_type_hash, targets, arity)
self.redis.sadd(key, handle)
if key:
self.redis.sadd(key, handle)

for handle in incoming_buffer:
key = _build_redis_key(KeyPrefix.INCOMING_SET, handle)
Expand Down Expand Up @@ -1287,8 +1399,9 @@ def _retrieve_documents_by_index(
raise ValueError(f"Index '{index_id}' does not exist in collection '{collection}'")

def reindex(self, pattern_index_templates: dict[str, list[DocumentT]] | None = None) -> None:
if pattern_index_templates is not None:
self.pattern_index_templates = deepcopy(pattern_index_templates)
if isinstance(pattern_index_templates, list):
self._save_pattern_index(deepcopy(pattern_index_templates))
self._setup_indexes({'pattern_index_templates': pattern_index_templates})
self.redis.flushall()
self._update_atom_indexes(self.mongo_atoms_collection.find({}))

Expand Down
Loading

0 comments on commit 579d187

Please sign in to comment.