Skip to content

Commit

Permalink
Merge pull request #1040 from parea-ai/improve-lc-errors
Browse files Browse the repository at this point in the history
Improve lc errors
  • Loading branch information
jalexanderII authored Aug 7, 2024
2 parents 466c5bc + a75fcb6 commit 11b9952
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 16 deletions.
5 changes: 3 additions & 2 deletions parea/parea_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from parea.helpers import serialize_metadata_values
from parea.schemas.log import TraceIntegrations
from parea.schemas.models import CreateGetProjectResponseSchema, TraceLog, UpdateLog
from parea.utils.trace_integrations.langchain_utils import _dumps_json
from parea.utils.universal_encoder import json_dumps

logger = logging.getLogger()
Expand Down Expand Up @@ -90,7 +91,7 @@ def record_vendor_log(self, data: Dict[str, Any], vendor: TraceIntegrations) ->
self._client.request(
"POST",
VENDOR_LOG_ENDPOINT.format(vendor=vendor.value),
data=json.loads(json_dumps(data)), # uuid is not serializable
data=json.loads(_dumps_json(data)), # uuid is not serializable
)

async def arecord_vendor_log(self, data: Dict[str, Any], vendor: TraceIntegrations) -> None:
Expand All @@ -101,7 +102,7 @@ async def arecord_vendor_log(self, data: Dict[str, Any], vendor: TraceIntegratio
await self._client.request_async(
"POST",
VENDOR_LOG_ENDPOINT.format(vendor=vendor.value),
data=json.loads(json_dumps(data)), # uuid is not serializable
data=json.loads(_dumps_json(data)), # uuid is not serializable
)


Expand Down
29 changes: 16 additions & 13 deletions parea/utils/trace_integrations/langchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,22 @@ def __init__(
def _persist_run(self, run: Union[Run, LLMRun, ChainRun, ToolRun]) -> None:
if is_logging_disabled():
return
self.parent_trace_id = run.id
# using .dict() since langchain Run class currently set to Pydantic v1
data = run.dict()
data["_parea_root_trace_id"] = self._parea_root_trace_id or None
data["_session_id"] = self._session_id
data["_tags"] = self._tags
data["_metadata"] = self._metadata
data["_end_user_identifier"] = self._end_user_identifier
data["_deployment_id"] = self._deployment_id
# check if run has an attribute execution order
if (hasattr(run, "execution_order") and run.execution_order == 1) or run.parent_run_id is None:
data["_parea_parent_trace_id"] = self._parea_parent_trace_id or None
parea_logger.record_vendor_log(data, TraceIntegrations.LANGCHAIN)
try:
self.parent_trace_id = run.id
# using .dict() since langchain Run class currently set to Pydantic v1
data = run.dict()
data["_parea_root_trace_id"] = self._parea_root_trace_id or None
data["_session_id"] = self._session_id
data["_tags"] = self._tags
data["_metadata"] = self._metadata
data["_end_user_identifier"] = self._end_user_identifier
data["_deployment_id"] = self._deployment_id
# check if run has an attribute execution order
if (hasattr(run, "execution_order") and run.execution_order == 1) or run.parent_run_id is None:
data["_parea_parent_trace_id"] = self._parea_parent_trace_id or None
parea_logger.record_vendor_log(data, TraceIntegrations.LANGCHAIN)
except Exception as e:
logger.exception(f"Error occurred while logging langchain run: {e}", stack_info=True)

def get_parent_trace_id(self) -> UUID:
return self.parent_trace_id
Expand Down
215 changes: 215 additions & 0 deletions parea/utils/trace_integrations/langchain_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
from __future__ import annotations

from typing import Any, Callable, Dict, List, Optional, TypedDict, TypeVar, Union

import copy
import datetime
import functools
import json
import logging
import re
import uuid
from enum import Enum

from orjson import orjson

logger = logging.getLogger()


ID_TYPE = Union[uuid.UUID, str]
_MAX_DEPTH = 2


class RunTypeEnum(str, Enum):
"""(Deprecated) Enum for run types. Use string directly."""

tool = "tool"
chain = "chain"
llm = "llm"
retriever = "retriever"
embedding = "embedding"
prompt = "prompt"
parser = "parser"


class RunLikeDict(TypedDict, total=False):
"""Run-like dictionary, for type-hinting."""

name: str
run_type: RunTypeEnum
start_time: datetime
inputs: Optional[dict]
outputs: Optional[dict]
end_time: Optional[datetime]
extra: Optional[dict]
error: Optional[str]
serialized: Optional[dict]
parent_run_id: Optional[uuid.UUID]
manifest_id: Optional[uuid.UUID]
events: Optional[List[dict]]
tags: Optional[List[str]]
inputs_s3_urls: Optional[dict]
outputs_s3_urls: Optional[dict]
id: Optional[uuid.UUID]
session_id: Optional[uuid.UUID]
session_name: Optional[str]
reference_example_id: Optional[uuid.UUID]
input_attachments: Optional[dict]
output_attachments: Optional[dict]
trace_id: uuid.UUID
dotted_order: str


def _as_uuid(value: ID_TYPE, var: Optional[str] = None) -> uuid.UUID:
try:
return uuid.UUID(value) if not isinstance(value, uuid.UUID) else value
except ValueError as e:
var = var or "value"
raise Exception(f"{var} must be a valid UUID or UUID string. Got {value}") from e


def _simple_default(obj: Any) -> Any:
# Don't traverse into nested objects
try:
if isinstance(obj, datetime.datetime):
return obj.isoformat()
if isinstance(obj, uuid.UUID):
return str(obj)
return json.loads(json.dumps(obj))
except BaseException as e:
logger.debug(f"Failed to serialize {type(obj)} to JSON: {e}")
return repr(obj)


def _serialize_json(obj: Any, depth: int = 0, serialize_py: bool = True) -> Any:
try:
if depth >= _MAX_DEPTH:
try:
return orjson.loads(_dumps_json_single(obj))
except BaseException:
return repr(obj)
if isinstance(obj, bytes):
return obj.decode("utf-8")
if isinstance(obj, (set, tuple)):
return orjson.loads(_dumps_json_single(list(obj)))

serialization_methods = [
("model_dump_json", True), # Pydantic V2
("json", True), # Pydantic V1
("to_json", False), # dataclass_json
("model_dump", True), # Pydantic V2 with non-serializable fields
("dict", False), # Pydantic V1 with non-serializable fields
]
for attr, exclude_none in serialization_methods:
if hasattr(obj, attr) and callable(getattr(obj, attr)):
try:
method = getattr(obj, attr)
json_str = method(exclude_none=exclude_none) if exclude_none else method()
if isinstance(json_str, str):
return json.loads(json_str)
return orjson.loads(_dumps_json(json_str, depth=depth + 1, serialize_py=serialize_py))
except Exception as e:
logger.debug(f"Failed to serialize {type(obj)} to JSON: {e}")
pass
if serialize_py:
all_attrs = {}
if hasattr(obj, "__slots__"):
all_attrs.update({slot: getattr(obj, slot, None) for slot in obj.__slots__})
if hasattr(obj, "__dict__"):
all_attrs.update(vars(obj))
if all_attrs:
filtered = {k: v if v is not obj else repr(v) for k, v in all_attrs.items()}
return orjson.loads(_dumps_json(filtered, depth=depth + 1, serialize_py=serialize_py))
return repr(obj)
except BaseException as e:
logger.debug(f"Failed to serialize {type(obj)} to JSON: {e}")
return repr(obj)


def _elide_surrogates(s: bytes) -> bytes:
pattern = re.compile(rb"\\ud[89a-f][0-9a-f]{2}", re.IGNORECASE)
result = pattern.sub(b"", s)
return result


def _dumps_json_single(obj: Any, default: Optional[Callable[[Any], Any]] = None) -> bytes:
try:
return orjson.dumps(
obj,
default=default,
option=orjson.OPT_SERIALIZE_NUMPY | orjson.OPT_SERIALIZE_DATACLASS | orjson.OPT_SERIALIZE_UUID | orjson.OPT_NON_STR_KEYS,
)
except TypeError as e:
# Usually caused by UTF surrogate characters
logger.debug(f"Orjson serialization failed: {repr(e)}. Falling back to json.")
result = json.dumps(
obj,
default=_simple_default,
ensure_ascii=True,
).encode("utf-8")
try:
result = orjson.dumps(orjson.loads(result.decode("utf-8", errors="surrogateescape")))
except orjson.JSONDecodeError:
result = _elide_surrogates(result)
return result


def _dumps_json(obj: Any, depth: int = 0, serialize_py: bool = True) -> bytes:
"""Serialize an object to a JSON formatted string.
Parameters
----------
obj : Any
The object to serialize.
default : Callable[[Any], Any] or None, default=None
The default function to use for serialization.
Returns:
-------
str
The JSON formatted string.
"""
return _dumps_json_single(obj, functools.partial(_serialize_json, depth=depth, serialize_py=serialize_py))


T = TypeVar("T")


def _middle_copy(val: T, memo: Dict[int, Any], max_depth: int = 4, _depth: int = 0) -> T:
cls = type(val)

copier = getattr(cls, "__deepcopy__", None)
if copier is not None:
try:
return copier(memo)
except BaseException:
pass
if _depth >= max_depth:
return val
if isinstance(val, dict):
return {_middle_copy(k, memo, max_depth, _depth + 1): _middle_copy(v, memo, max_depth, _depth + 1) for k, v in val.items()} # type: ignore[return-value]
if isinstance(val, list):
return [_middle_copy(item, memo, max_depth, _depth + 1) for item in val] # type: ignore[return-value]
if isinstance(val, tuple):
return tuple(_middle_copy(item, memo, max_depth, _depth + 1) for item in val) # type: ignore[return-value]
if isinstance(val, set):
return {_middle_copy(item, memo, max_depth, _depth + 1) for item in val} # type: ignore[return-value]

return val


def deepish_copy(val: T) -> T:
"""Deep copy a value with a compromise for uncopyable objects.
Args:
val: The value to be deep copied.
Returns:
The deep copied value.
"""
memo: Dict[int, Any] = {}
try:
return copy.deepcopy(val, memo)
except BaseException as e:
# Generators, locks, etc. cannot be copied
# and raise a TypeError (mentioning pickling, since the dunder methods)
# are re-used for copying. We'll try to do a compromise and copy
# what we can
logger.debug("Failed to deepcopy input: %s", repr(e))
return _middle_copy(val, memo)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "parea-ai"
packages = [{ include = "parea" }]
version = "0.2.194"
version = "0.2.195"
description = "Parea python sdk"
readme = "README.md"
authors = ["joel-parea-ai <[email protected]>"]
Expand Down

0 comments on commit 11b9952

Please sign in to comment.