Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

retrieve the complete chat history with pagination #54

Merged
merged 2 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 26 additions & 19 deletions libs/elasticsearch/langchain_elasticsearch/_async/chat_history.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import logging
from time import time
from typing import TYPE_CHECKING, List, Optional, Sequence
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence

from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import BaseMessage, message_to_dict, messages_from_dict
Expand Down Expand Up @@ -101,26 +101,33 @@ async def create_if_missing(self) -> None:

async def aget_messages(self) -> List[BaseMessage]: # type: ignore[override]
"""Retrieve the messages from Elasticsearch"""
try:
from elasticsearch import ApiError
from elasticsearch import ApiError

await self.create_if_missing()
result = await self.client.search(
index=self.index,
query={"term": {"session_id": self.session_id}},
sort="created_at:asc",
)
except ApiError as err:
logger.error(f"Could not retrieve messages from Elasticsearch: {err}")
raise err
await self.create_if_missing()

if result and len(result["hits"]["hits"]) > 0:
items = [
json.loads(document["_source"]["history"])
for document in result["hits"]["hits"]
]
else:
items = []
search_after: Dict[str, Any] = {}
items = []
while True:
try:
result = await self.client.search(
index=self.index,
query={"term": {"session_id": self.session_id}},
sort="created_at:asc",
size=100,
**search_after,
)
except ApiError as err:
logger.error(f"Could not retrieve messages from Elasticsearch: {err}")
raise err

if result and len(result["hits"]["hits"]) > 0:
items += [
json.loads(document["_source"]["history"])
for document in result["hits"]["hits"]
]
search_after = {"search_after": result["hits"]["hits"][-1]["sort"]}
else:
break

return messages_from_dict(items)

Expand Down
45 changes: 26 additions & 19 deletions libs/elasticsearch/langchain_elasticsearch/_sync/chat_history.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import logging
from time import time
from typing import TYPE_CHECKING, List, Optional, Sequence
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence

from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import BaseMessage, message_to_dict, messages_from_dict
Expand Down Expand Up @@ -101,26 +101,33 @@ def create_if_missing(self) -> None:

def get_messages(self) -> List[BaseMessage]: # type: ignore[override]
"""Retrieve the messages from Elasticsearch"""
try:
from elasticsearch import ApiError
from elasticsearch import ApiError

self.create_if_missing()
result = self.client.search(
index=self.index,
query={"term": {"session_id": self.session_id}},
sort="created_at:asc",
)
except ApiError as err:
logger.error(f"Could not retrieve messages from Elasticsearch: {err}")
raise err
self.create_if_missing()

if result and len(result["hits"]["hits"]) > 0:
items = [
json.loads(document["_source"]["history"])
for document in result["hits"]["hits"]
]
else:
items = []
search_after: Dict[str, Any] = {}
items = []
while True:
try:
result = self.client.search(
index=self.index,
query={"term": {"session_id": self.session_id}},
sort="created_at:asc",
size=100,
**search_after,
)
except ApiError as err:
logger.error(f"Could not retrieve messages from Elasticsearch: {err}")
raise err

if result and len(result["hits"]["hits"]) > 0:
items += [
json.loads(document["_source"]["history"])
for document in result["hits"]["hits"]
]
search_after = {"search_after": result["hits"]["hits"][-1]["sort"]}
else:
break

return messages_from_dict(items)

Expand Down