Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MemoryStore: replace mongomock with pymongo-inmemory #846

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
148523f
MemoryStore: mongomock -> pymongo-inmemory
rkingsbury Aug 25, 2023
4ab8b4a
MemoryStorage: use local mongod when possible except CI
rkingsbury Aug 25, 2023
7e867f7
Merge branch 'main' into inmemory
rkingsbury Aug 25, 2023
1587e2f
pin pymongo-inmemory to pymongo version
rkingsbury Aug 25, 2023
a306cb3
try CI fix
rkingsbury Aug 25, 2023
ba2b1c5
CI try
rkingsbury Aug 25, 2023
c00bd84
CI mongo version
rkingsbury Aug 25, 2023
cb57736
CI fix
rkingsbury Aug 25, 2023
fae4023
MemoryStore: use MongoDB v6
rkingsbury Aug 25, 2023
09cd5cc
MemoryStore: expand __eq__ testing
rkingsbury Aug 25, 2023
cbfc58f
MemoryStore: drop collection on close()
rkingsbury Aug 25, 2023
55963ff
MemoryStore: test cleanups
rkingsbury Aug 25, 2023
c7655bd
MemoryStore: assure test files close() part 1
rkingsbury Aug 25, 2023
837b4a7
MemoryStore test updates part 2
rkingsbury Aug 25, 2023
2df7d6a
MemoryStore: unique per instance coll name
rkingsbury Aug 26, 2023
14e843b
MemoryStore __del__ fix
rkingsbury Aug 26, 2023
c3f43f9
MemoryStore fixes
rkingsbury Aug 26, 2023
1e8c262
MemoryStore fixes
rkingsbury Aug 26, 2023
232ab89
fixes
rkingsbury Aug 26, 2023
e403bfd
MontyStore: inherit from MongoStore
rkingsbury Aug 26, 2023
acb98e4
test fixes and MontyStore __eq__ fixups
rkingsbury Aug 26, 2023
03c2e31
revert changes to FileStore and Azure tests
rkingsbury Aug 27, 2023
671bc6f
revert changes to some JsonStore test
rkingsbury Aug 27, 2023
4e0e6b5
more changes
rkingsbury Aug 27, 2023
830d0a6
test fixes
rkingsbury Aug 27, 2023
d27558d
fixes
rkingsbury Aug 27, 2023
eac9406
fix
rkingsbury Aug 27, 2023
421e4e9
rm print statement
rkingsbury Aug 27, 2023
3cf4e3f
Fix test_patch_submission
Aug 29, 2023
772c5f0
Merge branch 'main' of https://github.com/materialsproject/maggma int…
rkingsbury Aug 30, 2023
6ed6898
Merge branch 'main' into inmemory
rkingsbury Sep 1, 2023
ec3324e
Merge branch 'main' into inmemory
rkingsbury Sep 6, 2023
b478ad5
bump pymongo-inmemory to 0.3.1
rkingsbury Sep 10, 2023
33a1fdf
CI python version workaround
rkingsbury Sep 10, 2023
be62d78
Merge branch 'main' of https://github.com/materialsproject/maggma int…
rkingsbury May 15, 2024
d448e49
add pymongo-inmemory
rkingsbury May 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ jobs:
env:
CONTINUOUS_INTEGRATION: True
MONGODB_SRV_URI: ${{ secrets.MONGODB_SRV_URI }}
PYMONGOIM__OPERATING_SYSTEM: ubuntu
PYMONGOIM__MONGO_VERSION: 6.0
run: |
pip install -e .
pytest --cov=maggma --cov-report=xml
Expand Down
5 changes: 5 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# TODO - this entire file can be removed once pymongo-inmemory supports pyproject.toml
# see https://github.com/kaizendorks/pymongo_inmemory/issues/81
[pymongo_inmemory]
mongod_port = 27019
mongo_version = 6.0
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"pydantic<2.0",
"pydantic>=0.32.2",
"pymongo>=4.2.0",
"pymongo-inmemory",
"monty>=1.0.2",
"mongomock>=3.10.0",
"pydash>=4.1.0",
Expand Down
3 changes: 0 additions & 3 deletions src/maggma/stores/file_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ def __init__(
self.json_name = json_name
file_filters = file_filters if file_filters else ["*"]
self.file_filters = re.compile("|".join(fnmatch.translate(p) for p in file_filters))
self.collection_name = "file_store"
self.key = "file_id"
self.include_orphans = include_orphans
self.read_only = read_only
Expand All @@ -104,14 +103,12 @@ def __init__(
self.metadata_store = JSONStore(
paths=[str(self.path / self.json_name)],
read_only=self.read_only,
collection_name=self.collection_name,
key=self.key,
)

self.kwargs = kwargs

super().__init__(
collection_name=self.collection_name,
key=self.key,
**self.kwargs,
)
Expand Down
190 changes: 111 additions & 79 deletions src/maggma/stores/mongolike.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""

import warnings
from datetime import datetime
from itertools import chain, groupby
from pathlib import Path
from socket import socket
Expand All @@ -18,7 +19,6 @@

from typing_extensions import Literal

import mongomock
import orjson
from monty.dev import requires
from monty.io import zopen
Expand All @@ -27,6 +27,7 @@
from pydash import get, has, set_
from pymongo import MongoClient, ReplaceOne, uri_parser
from pymongo.errors import ConfigurationError, DocumentTooLarge, OperationFailure
from pymongo_inmemory import MongoClient as MemoryClient
from sshtunnel import SSHTunnelForwarder

from maggma.core import Sort, Store, StoreError
Expand Down Expand Up @@ -139,10 +140,12 @@ def __init__(
port: TCP port to connect to
username: Username for the collection
password: Password to connect with
ssh_tunnel: SSHTunnel instance to use for connection.
safe_update: fail gracefully on DocumentTooLarge errors on update
auth_source: The database to authenticate on. Defaults to the database name.
default_sort: Default sort field and direction to use when querying. Can be used to
ensure determinacy in query results.
mongoclient_kwargs: Dict of extra kwargs to pass to MongoClient()
"""
self.database = database
self.collection_name = collection_name
Expand Down Expand Up @@ -503,7 +506,7 @@ def __eq__(self, other: object) -> bool:
Check equality for MongoStore
other: other mongostore to compare with
"""
if not isinstance(other, MongoStore):
if not isinstance(other, self.__class__):
return False

fields = ["database", "collection_name", "host", "port", "last_updated_field"]
Expand Down Expand Up @@ -578,94 +581,67 @@ class MemoryStore(MongoStore):
to a MongoStore
"""

def __init__(self, collection_name: str = "memory_db", **kwargs):
def __init__(
self,
database: str = "mem",
collection_name: Optional[str] = None,
host: str = "localhost",
port: int = 27019, # to avoid conflicts with localhost
safe_update: bool = False,
mongoclient_kwargs: Optional[Dict] = None,
default_sort: Optional[Dict[str, Union[Sort, int]]] = None,
**kwargs,
):
"""
Initializes the Memory Store
Args:
collection_name: name for the collection in memory
"""
self.collection_name = collection_name
self.default_sort = None
self._coll = None
self.kwargs = kwargs
super(MongoStore, self).__init__(**kwargs)
database: The database name
collection_name: The collection name. If None (default) a unique collection name is set based
on the current date and time. This ensures that multiple Store instances can coexist without
overwriting one another.
host: Hostname for the database
port: TCP port to connect to
safe_update: fail gracefully on DocumentTooLarge errors on update
default_sort: Default sort field and direction to use when querying.
Can be used to ensure determinacy in query results.
mongoclient_kwargs: Dict of extra kwargs to pass to MongoClient()
"""
if not collection_name:
collection_name = str(datetime.utcnow())
super().__init__(
database=database,
collection_name=collection_name,
host=host,
port=port,
safe_update=safe_update,
mongoclient_kwargs=mongoclient_kwargs,
default_sort=default_sort,
**kwargs,
)

def connect(self, force_reset: bool = False):
"""
Connect to the source data
"""

if self._coll is None or force_reset:
self._coll = mongomock.MongoClient().db[self.name] # type: ignore

def close(self):
"""Close up all collections"""
self._coll.database.client.close()
conn: MemoryClient = MemoryClient(
host=self.host,
port=self.port,
**self.mongoclient_kwargs,
)
db = conn[self.database]
self._coll = db[self.collection_name] # type: ignore

@property
def name(self):
"""Name for the store"""
return f"mem://{self.collection_name}"

def __hash__(self):
"""Hash for the store"""
return hash((self.name, self.last_updated_field))

def groupby(
self,
keys: Union[List[str], str],
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
) -> Iterator[Tuple[Dict, List[Dict]]]:
"""
Simple grouping function that will group documents
by keys.

Args:
keys: fields to group documents
criteria: PyMongo filter for documents to search in
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned

Returns:
generator returning tuples of (key, list of elements)
"""
keys = keys if isinstance(keys, list) else [keys]

if properties is None:
properties = []
if isinstance(properties, dict):
properties = list(properties.keys())

data = [
doc for doc in self.query(properties=keys + properties, criteria=criteria) if all(has(doc, k) for k in keys)
]

def grouping_keys(doc):
return tuple(get(doc, k) for k in keys)
return f"mem://{self.database}/{self.collection_name}"

for vals, group in groupby(sorted(data, key=grouping_keys), key=grouping_keys):
doc = {} # type: ignore
for k, v in zip(keys, vals):
set_(doc, k, v)
yield doc, list(group)

def __eq__(self, other: object) -> bool:
"""
Check equality for MemoryStore
other: other MemoryStore to compare with
"""
if not isinstance(other, MemoryStore):
return False

fields = ["collection_name", "last_updated_field"]
return all(getattr(self, f) == getattr(other, f) for f in fields)
# def __del__(self):
# """
# Ensure collection is dropped from memory on object destruction, even if .close() has not been called.
# """
# if self._coll is not None:
# self._collection.drop()
Comment on lines +577 to +582

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.


class JSONStore(MemoryStore):
Expand Down Expand Up @@ -835,7 +811,7 @@ def __eq__(self, other: object) -> bool:
Args:
other: other JSONStore to compare with
"""
if not isinstance(other, JSONStore):
if not isinstance(other, self.__class__):
return False

fields = ["paths", "last_updated_field"]
Expand All @@ -847,7 +823,7 @@ def __eq__(self, other: object) -> bool:
"MontyStore requires MontyDB to be installed. See the MontyDB repository for more "
"information: https://github.com/davidlatwe/montydb",
)
class MontyStore(MemoryStore):
class MontyStore(MongoStore):
"""
A MongoDB compatible store that uses on disk files for storage.

Expand Down Expand Up @@ -988,6 +964,62 @@ def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = No

self._collection.replace_one(search_doc, d, upsert=True)

def groupby(
self,
keys: Union[List[str], str],
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
) -> Iterator[Tuple[Dict, List[Dict]]]:
"""
Simple grouping function that will group documents
by keys.

Args:
keys: fields to group documents
criteria: PyMongo filter for documents to search in
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned

Returns:
generator returning tuples of (key, list of elements)
"""
keys = keys if isinstance(keys, list) else [keys]

if properties is None:
properties = []
if isinstance(properties, dict):
properties = list(properties.keys())

data = [
doc for doc in self.query(properties=keys + properties, criteria=criteria) if all(has(doc, k) for k in keys)
]

def grouping_keys(doc):
return tuple(get(doc, k) for k in keys)

for vals, group in groupby(sorted(data, key=grouping_keys), key=grouping_keys):
doc = {} # type: ignore
for k, v in zip(keys, vals):
set_(doc, k, v)
yield doc, list(group)

def __eq__(self, other: object) -> bool:
"""
Check equality for MontyStore
other: other Store to compare with
"""
if not isinstance(other, self.__class__):
return False

fields = ["database_name", "collection_name", "last_updated_field"]
return all(getattr(self, f) == getattr(other, f) for f in fields)


def _find_free_port(address="0.0.0.0"):
s = socket()
Expand Down
6 changes: 5 additions & 1 deletion tests/builders/test_copy_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ def test_run(source, target, old_docs, new_docs):

builder = CopyBuilder(source, target)
builder.run()
builder.target.connect()

target.connect()
assert builder.target.query_one(criteria={"k": 0})["v"] == "new"
assert builder.target.query_one(criteria={"k": 10})["v"] == "old"

Expand All @@ -112,6 +113,8 @@ def test_query(source, target, old_docs, new_docs):
source.update(old_docs)
source.update(new_docs)
builder.run()

target.connect()
all_docs = list(target.query(criteria={}))
assert len(all_docs) == 14
assert min([d["k"] for d in all_docs]) == 6
Expand All @@ -127,6 +130,7 @@ def test_delete_orphans(source, target, old_docs, new_docs):
source._collection.delete_many(deletion_criteria)
builder.run()

target.connect()
assert target._collection.count_documents(deletion_criteria) == 0
assert target.query_one(criteria={"k": 5})["v"] == "new"
assert target.query_one(criteria={"k": 10})["v"] == "old"
Expand Down
4 changes: 4 additions & 0 deletions tests/builders/test_projection_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ def test_update_targets(source1, source2, target):
def test_run(source1, source2, target):
builder = Projection_Builder(source_stores=[source1, source2], target_store=target)
builder.run()

target.connect()
assert len(list(target.query())) == 15
assert target.query_one(criteria={"k": 0})["a"] == "a"
assert target.query_one(criteria={"k": 0})["d"] == "d"
Expand All @@ -117,4 +119,6 @@ def test_query(source1, source2, target):
query_by_key=[0, 1, 2, 3, 4],
)
builder.run()

target.connect()
assert len(list(target.query())) == 5
Loading