diff --git a/mem0/client/main.py b/mem0/client/main.py index 3996b05a77..cf291da6a0 100644 --- a/mem0/client/main.py +++ b/mem0/client/main.py @@ -5,13 +5,12 @@ import httpx -from mem0.memory.setup import get_user_id, setup_config -from mem0.memory.telemetry import capture_client_event +from mem0.memory.setup import get_user_id +from mem0.memory.telemetry import AnonymousTelemetry, capture_client_event logger = logging.getLogger(__name__) -# Setup user config -setup_config() +telemetry = AnonymousTelemetry(vector_store_provider="qdrant") class APIError(Exception): @@ -83,7 +82,7 @@ def __init__( timeout=60, ) self._validate_api_key() - capture_client_event("client.init", self) + capture_client_event(telemetry, "client.init", self) def _validate_api_key(self): """Validate the API key by making a test request.""" @@ -113,7 +112,7 @@ def add(self, messages: Union[str, List[Dict[str, str]]], **kwargs) -> Dict[str, response.raise_for_status() if "metadata" in kwargs: del kwargs["metadata"] - capture_client_event("client.add", self, {"keys": list(kwargs.keys())}) + capture_client_event(telemetry, "client.add", self, {"keys": list(kwargs.keys())}) return response.json() @api_error_handler @@ -131,7 +130,7 @@ def get(self, memory_id: str) -> Dict[str, Any]: """ response = self.client.get(f"/v1/memories/{memory_id}/") response.raise_for_status() - capture_client_event("client.get", self, {"memory_id": memory_id}) + capture_client_event(telemetry, "client.get", self, {"memory_id": memory_id}) return response.json() @api_error_handler @@ -158,6 +157,7 @@ def get_all(self, version: str = "v1", **kwargs) -> List[Dict[str, Any]]: if "metadata" in kwargs: del kwargs["metadata"] capture_client_event( + telemetry, "client.get_all", self, {"api_version": version, "keys": list(kwargs.keys())}, @@ -186,7 +186,9 @@ def search(self, query: str, version: str = "v1", **kwargs) -> List[Dict[str, An response.raise_for_status() if "metadata" in kwargs: del kwargs["metadata"] - capture_client_event("client.search", self, {"api_version": version, "keys": list(kwargs.keys())}) + capture_client_event( + telemetry, "client.search", self, {"api_version": version, "keys": list(kwargs.keys())} + ) return response.json() @api_error_handler @@ -199,7 +201,7 @@ def update(self, memory_id: str, data: str) -> Dict[str, Any]: Returns: Dict[str, Any]: The response from the server. """ - capture_client_event("client.update", self, {"memory_id": memory_id}) + capture_client_event(telemetry, "client.update", self, {"memory_id": memory_id}) response = self.client.put(f"/v1/memories/{memory_id}/", json={"text": data}) response.raise_for_status() return response.json() @@ -219,7 +221,7 @@ def delete(self, memory_id: str) -> Dict[str, Any]: """ response = self.client.delete(f"/v1/memories/{memory_id}/") response.raise_for_status() - capture_client_event("client.delete", self, {"memory_id": memory_id}) + capture_client_event(telemetry, "client.delete", self, {"memory_id": memory_id}) return response.json() @api_error_handler @@ -239,7 +241,7 @@ def delete_all(self, **kwargs) -> Dict[str, str]: params = self._prepare_params(kwargs) response = self.client.delete("/v1/memories/", params=params) response.raise_for_status() - capture_client_event("client.delete_all", self, {"keys": list(kwargs.keys())}) + capture_client_event(telemetry, "client.delete_all", self, {"keys": list(kwargs.keys())}) return response.json() @api_error_handler @@ -257,7 +259,7 @@ def history(self, memory_id: str) -> List[Dict[str, Any]]: """ response = self.client.get(f"/v1/memories/{memory_id}/history/") response.raise_for_status() - capture_client_event("client.history", self, {"memory_id": memory_id}) + capture_client_event(telemetry, "client.history", self, {"memory_id": memory_id}) return response.json() @api_error_handler @@ -266,7 +268,7 @@ def users(self) -> Dict[str, Any]: params = {"org_name": self.organization, "project_name": self.project} response = self.client.get("/v1/entities/", params=params) response.raise_for_status() - capture_client_event("client.users", self) + capture_client_event(telemetry, "client.users", self) return response.json() @api_error_handler @@ -278,7 +280,7 @@ def delete_users(self) -> Dict[str, str]: response = self.client.delete(f"/v1/entities/{entity['type']}/{entity['id']}/", params=params) response.raise_for_status() - capture_client_event("client.delete_users", self) + capture_client_event(telemetry, "client.delete_users", self) return {"message": "All users, agents, and sessions deleted."} @api_error_handler @@ -297,7 +299,7 @@ def reset(self) -> Dict[str, str]: # This will also delete the memories self.delete_users() - capture_client_event("client.reset", self) + capture_client_event(telemetry, "client.reset", self) return {"message": "Client reset successful. All users and memories deleted."} def chat(self): @@ -372,14 +374,14 @@ async def add(self, messages: Union[str, List[Dict[str, str]]], **kwargs) -> Dic response.raise_for_status() if "metadata" in kwargs: del kwargs["metadata"] - capture_client_event("async_client.add", self.sync_client, {"keys": list(kwargs.keys())}) + capture_client_event(telemetry, "async_client.add", self.sync_client, {"keys": list(kwargs.keys())}) return response.json() @api_error_handler async def get(self, memory_id: str) -> Dict[str, Any]: response = await self.async_client.get(f"/v1/memories/{memory_id}/") response.raise_for_status() - capture_client_event("async_client.get", self.sync_client, {"memory_id": memory_id}) + capture_client_event(telemetry, "async_client.get", self.sync_client, {"memory_id": memory_id}) return response.json() @api_error_handler @@ -393,7 +395,10 @@ async def get_all(self, version: str = "v1", **kwargs) -> List[Dict[str, Any]]: if "metadata" in kwargs: del kwargs["metadata"] capture_client_event( - "async_client.get_all", self.sync_client, {"api_version": version, "keys": list(kwargs.keys())} + telemetry, + "async_client.get_all", + self.sync_client, + {"api_version": version, "keys": list(kwargs.keys())}, ) return response.json() @@ -406,7 +411,10 @@ async def search(self, query: str, version: str = "v1", **kwargs) -> List[Dict[s if "metadata" in kwargs: del kwargs["metadata"] capture_client_event( - "async_client.search", self.sync_client, {"api_version": version, "keys": list(kwargs.keys())} + telemetry, + "async_client.search", + self.sync_client, + {"api_version": version, "keys": list(kwargs.keys())}, ) return response.json() @@ -414,14 +422,14 @@ async def search(self, query: str, version: str = "v1", **kwargs) -> List[Dict[s async def update(self, memory_id: str, data: str) -> Dict[str, Any]: response = await self.async_client.put(f"/v1/memories/{memory_id}/", json={"text": data}) response.raise_for_status() - capture_client_event("async_client.update", self.sync_client, {"memory_id": memory_id}) + capture_client_event(telemetry, "async_client.update", self.sync_client, {"memory_id": memory_id}) return response.json() @api_error_handler async def delete(self, memory_id: str) -> Dict[str, Any]: response = await self.async_client.delete(f"/v1/memories/{memory_id}/") response.raise_for_status() - capture_client_event("async_client.delete", self.sync_client, {"memory_id": memory_id}) + capture_client_event(telemetry, "async_client.delete", self.sync_client, {"memory_id": memory_id}) return response.json() @api_error_handler @@ -429,14 +437,16 @@ async def delete_all(self, **kwargs) -> Dict[str, str]: params = self.sync_client._prepare_params(kwargs) response = await self.async_client.delete("/v1/memories/", params=params) response.raise_for_status() - capture_client_event("async_client.delete_all", self.sync_client, {"keys": list(kwargs.keys())}) + capture_client_event( + telemetry, "async_client.delete_all", self.sync_client, {"keys": list(kwargs.keys())} + ) return response.json() @api_error_handler async def history(self, memory_id: str) -> List[Dict[str, Any]]: response = await self.async_client.get(f"/v1/memories/{memory_id}/history/") response.raise_for_status() - capture_client_event("async_client.history", self.sync_client, {"memory_id": memory_id}) + capture_client_event(telemetry, "async_client.history", self.sync_client, {"memory_id": memory_id}) return response.json() @api_error_handler @@ -444,7 +454,7 @@ async def users(self) -> Dict[str, Any]: params = {"org_name": self.sync_client.organization, "project_name": self.sync_client.project} response = await self.async_client.get("/v1/entities/", params=params) response.raise_for_status() - capture_client_event("async_client.users", self.sync_client) + capture_client_event(telemetry, "async_client.users", self.sync_client) return response.json() @api_error_handler @@ -454,13 +464,13 @@ async def delete_users(self) -> Dict[str, str]: for entity in entities["results"]: response = await self.async_client.delete(f"/v1/entities/{entity['type']}/{entity['id']}/", params=params) response.raise_for_status() - capture_client_event("async_client.delete_users", self.sync_client) + capture_client_event(telemetry, "async_client.delete_users", self.sync_client) return {"message": "All users, agents, and sessions deleted."} @api_error_handler async def reset(self) -> Dict[str, str]: await self.delete_users() - capture_client_event("async_client.reset", self.sync_client) + capture_client_event(telemetry, "async_client.reset", self.sync_client) return {"message": "Client reset successful. All users and memories deleted."} async def chat(self): diff --git a/mem0/memory/main.py b/mem0/memory/main.py index f0f9cbaeed..2262127709 100644 --- a/mem0/memory/main.py +++ b/mem0/memory/main.py @@ -13,15 +13,11 @@ from mem0.configs.base import MemoryConfig, MemoryItem from mem0.configs.prompts import get_update_memory_messages from mem0.memory.base import MemoryBase -from mem0.memory.setup import setup_config from mem0.memory.storage import SQLiteManager -from mem0.memory.telemetry import capture_event +from mem0.memory.telemetry import AnonymousTelemetry, capture_event from mem0.memory.utils import get_fact_retrieval_messages, parse_messages from mem0.utils.factory import EmbedderFactory, LlmFactory, VectorStoreFactory -# Setup user config -setup_config() - logger = logging.getLogger(__name__) @@ -47,7 +43,8 @@ def __init__(self, config: MemoryConfig = MemoryConfig()): self.graph = MemoryGraph(self.config) self.enable_graph = True - capture_event("mem0.init", self) + self.telemetry = AnonymousTelemetry(self.config.vector_store.provider, self.config.vector_store) + capture_event(self.telemetry, "mem0.init", self) @classmethod def from_config(cls, config_dict: Dict[str, Any]): @@ -233,7 +230,7 @@ def _add_to_vector_store(self, messages, metadata, filters): except Exception as e: logging.error(f"Error in new_memories_with_actions: {e}") - capture_event("mem0.add", self, {"version": self.api_version, "keys": list(filters.keys())}) + capture_event(self.telemetry, "mem0.add", self, {"version": self.api_version, "keys": list(filters.keys())}) return returned_memories @@ -263,7 +260,7 @@ def get(self, memory_id): Returns: dict: Retrieved memory. """ - capture_event("mem0.get", self, {"memory_id": memory_id}) + capture_event(self.telemetry, "mem0.get", self, {"memory_id": memory_id}) memory = self.vector_store.get(vector_id=memory_id) if not memory: return None @@ -312,7 +309,7 @@ def get_all(self, user_id=None, agent_id=None, run_id=None, limit=100): if run_id: filters["run_id"] = run_id - capture_event("mem0.get_all", self, {"limit": limit, "keys": list(filters.keys())}) + capture_event(self.telemetry, "mem0.get_all", self, {"limit": limit, "keys": list(filters.keys())}) with concurrent.futures.ThreadPoolExecutor() as executor: future_memories = executor.submit(self._get_all_from_vector_store, filters, limit) @@ -403,6 +400,7 @@ def search(self, query, user_id=None, agent_id=None, run_id=None, limit=100, fil raise ValueError("One of the filters: user_id, agent_id or run_id is required!") capture_event( + self.telemetry, "mem0.search", self, {"limit": limit, "version": self.api_version, "keys": list(filters.keys())}, @@ -485,7 +483,7 @@ def update(self, memory_id, data): Returns: dict: Updated memory. """ - capture_event("mem0.update", self, {"memory_id": memory_id}) + capture_event(self.telemetry, "mem0.update", self, {"memory_id": memory_id}) existing_embeddings = {data: self.embedding_model.embed(data)} @@ -499,7 +497,7 @@ def delete(self, memory_id): Args: memory_id (str): ID of the memory to delete. """ - capture_event("mem0.delete", self, {"memory_id": memory_id}) + capture_event(self.telemetry, "mem0.delete", self, {"memory_id": memory_id}) self._delete_memory(memory_id) return {"message": "Memory deleted successfully!"} @@ -525,7 +523,7 @@ def delete_all(self, user_id=None, agent_id=None, run_id=None): "At least one filter is required to delete all memories. If you want to delete all memories, use the `reset()` method." ) - capture_event("mem0.delete_all", self, {"keys": list(filters.keys())}) + capture_event(self.telemetry, "mem0.delete_all", self, {"keys": list(filters.keys())}) memories = self.vector_store.list(filters=filters)[0] for memory in memories: self._delete_memory(memory.id) @@ -547,7 +545,7 @@ def history(self, memory_id): Returns: list: List of changes for the memory. """ - capture_event("mem0.history", self, {"memory_id": memory_id}) + capture_event(self.telemetry, "mem0.history", self, {"memory_id": memory_id}) return self.db.get_history(memory_id) def _create_memory(self, data, existing_embeddings, metadata=None): @@ -568,7 +566,7 @@ def _create_memory(self, data, existing_embeddings, metadata=None): payloads=[metadata], ) self.db.add_history(memory_id, None, data, "ADD", created_at=metadata["created_at"]) - capture_event("mem0._create_memory", self, {"memory_id": memory_id}) + capture_event(self.telemetry, "mem0._create_memory", self, {"memory_id": memory_id}) return memory_id def _update_memory(self, memory_id, data, existing_embeddings, metadata=None): @@ -611,7 +609,7 @@ def _update_memory(self, memory_id, data, existing_embeddings, metadata=None): created_at=new_metadata["created_at"], updated_at=new_metadata["updated_at"], ) - capture_event("mem0._update_memory", self, {"memory_id": memory_id}) + capture_event(self.telemetry, "mem0._update_memory", self, {"memory_id": memory_id}) return memory_id def _delete_memory(self, memory_id): @@ -620,7 +618,7 @@ def _delete_memory(self, memory_id): prev_value = existing_memory.payload["data"] self.vector_store.delete(vector_id=memory_id) self.db.add_history(memory_id, prev_value, None, "DELETE", is_deleted=1) - capture_event("mem0._delete_memory", self, {"memory_id": memory_id}) + capture_event(self.telemetry, "mem0._delete_memory", self, {"memory_id": memory_id}) return memory_id def reset(self): @@ -633,7 +631,7 @@ def reset(self): self.config.vector_store.provider, self.config.vector_store.config ) self.db.reset() - capture_event("mem0.reset", self) + capture_event(self.telemetry, "mem0.reset", self) def chat(self, query): raise NotImplementedError("Chat function not implemented yet.") diff --git a/mem0/memory/setup.py b/mem0/memory/setup.py index a22b2e13e4..e4839a0f4c 100644 --- a/mem0/memory/setup.py +++ b/mem0/memory/setup.py @@ -1,31 +1,72 @@ -import json import os import uuid +from datetime import datetime +from typing import Optional + +from mem0.vector_stores.configs import VectorStoreConfig # Set up the directory path home_dir = os.path.expanduser("~") mem0_dir = os.environ.get("MEM0_DIR") or os.path.join(home_dir, ".mem0") os.makedirs(mem0_dir, exist_ok=True) +USER_CONFIG_COLLECTION = "mem0_migrations" -def setup_config(): - config_path = os.path.join(mem0_dir, "config.json") - if not os.path.exists(config_path): - user_id = str(uuid.uuid4()) - config = {"user_id": user_id} - with open(config_path, "w") as config_file: - json.dump(config, config_file, indent=4) +def vector_store_setup(vector_store_provider: str, config: VectorStoreConfig): + if vector_store_provider == "qdrant": + from mem0.vector_stores.qdrant import Qdrant + return Qdrant(**config.config.dict()) + elif vector_store_provider == "milvus": + from mem0.vector_stores.milvus import MilvusDB + return MilvusDB(**config.config.dict()) + elif vector_store_provider == "chroma": + from mem0.vector_stores.chroma import ChromaDB + return ChromaDB(**config.config.dict()) + elif vector_store_provider == "pgvector": + from mem0.vector_stores.pgvector import PGVector + return PGVector(**config.config.dict()) + elif vector_store_provider == "azure_ai_search": + from mem0.vector_stores.azure_ai_search import AzureAISearch + return AzureAISearch(**config.config.dict()) + else: + raise ValueError(f"Invalid vector store provider: {vector_store_provider}") -def get_user_id(): - config_path = os.path.join(mem0_dir, "config.json") - if not os.path.exists(config_path): - return "anonymous_user" +def setup_config(vector_store_provider: str = "qdrant", vector_store_config: Optional[VectorStoreConfig] = None): + """Set up user configuration using vector store""" + if vector_store_config is None: + vector_store_config = VectorStoreConfig(config={"collection_name": USER_CONFIG_COLLECTION}) + else: + vector_store_config.config.collection_name = USER_CONFIG_COLLECTION + + vector_store = vector_store_setup(vector_store_provider, vector_store_config) + + user_id = str(uuid.uuid4()) + metadata = {"type": "user_config", "user_id": user_id, "created_at": datetime.now().isoformat()} + # Store empty vector with user metadata + vectors = [[0.0] * 1536] + vector_store.insert(vectors=vectors, payloads=[metadata], ids=[user_id]) + return user_id + + +def get_user_id(vector_store_provider: str = "qdrant", vector_store_config: Optional[VectorStoreConfig] = None): + """Retrieve user ID from vector store""" + if vector_store_config is None: + vector_store_config = VectorStoreConfig(config={"collection_name": USER_CONFIG_COLLECTION}) + else: + vector_store_config.config.collection_name = USER_CONFIG_COLLECTION + + vector_store = vector_store_setup(vector_store_provider, vector_store_config) try: - with open(config_path, "r") as config_file: - config = json.load(config_file) - user_id = config.get("user_id") - return user_id - except Exception: + # Search for user config entry using empty vector + query_vector = [0.0] * 1536 + results = vector_store.search(query=query_vector, limit=1, filters={"type": "user_config"}) + if results and len(results) > 0: + return results[0].payload.get("user_id") + + # If no user found, create new one + return setup_config(vector_store_provider, vector_store_config) + except Exception as e: + print(f"Error retrieving user_id: {e}") return "anonymous_user" diff --git a/mem0/memory/telemetry.py b/mem0/memory/telemetry.py index f595a458e7..37f03ddaa6 100644 --- a/mem0/memory/telemetry.py +++ b/mem0/memory/telemetry.py @@ -9,6 +9,8 @@ from mem0.memory.setup import get_user_id, setup_config MEM0_TELEMETRY = os.environ.get("MEM0_TELEMETRY", "True") +PROJECT_API_KEY = "phc_hgJkUVJFYtmaJqrvf6CYN67TIQ8yhXAkWzUn9AMU4yX" +POSTHOG_HOST = "https://us.i.posthog.com" if isinstance(MEM0_TELEMETRY, str): MEM0_TELEMETRY = MEM0_TELEMETRY.lower() in ("true", "1", "yes") @@ -21,11 +23,11 @@ class AnonymousTelemetry: - def __init__(self, project_api_key, host): - self.posthog = Posthog(project_api_key=project_api_key, host=host) + def __init__(self, vector_store_provider, vector_store_config = None): + self.posthog = Posthog(project_api_key=PROJECT_API_KEY, host=POSTHOG_HOST) # Call setup config to ensure that the user_id is generated - setup_config() - self.user_id = get_user_id() + setup_config(vector_store_provider, vector_store_config) + self.user_id = get_user_id(vector_store_provider, vector_store_config) if not MEM0_TELEMETRY: self.posthog.disabled = True @@ -49,14 +51,7 @@ def close(self): self.posthog.shutdown() -# Initialize AnonymousTelemetry -telemetry = AnonymousTelemetry( - project_api_key="phc_hgJkUVJFYtmaJqrvf6CYN67TIQ8yhXAkWzUn9AMU4yX", - host="https://us.i.posthog.com", -) - - -def capture_event(event_name, memory_instance, additional_data=None): +def capture_event(telemetry, event_name, memory_instance, additional_data=None): event_data = { "collection": memory_instance.collection_name, "vector_size": memory_instance.embedding_model.config.embedding_dims, @@ -75,7 +70,7 @@ def capture_event(event_name, memory_instance, additional_data=None): telemetry.capture_event(event_name, event_data) -def capture_client_event(event_name, instance, additional_data=None): +def capture_client_event(telemetry, event_name, instance, additional_data=None): event_data = { "function": f"{instance.__class__.__module__}.{instance.__class__.__name__}", }