diff --git a/langfuse/callback/langchain.py b/langfuse/callback/langchain.py index e9425c7a..a14f24c6 100644 --- a/langfuse/callback/langchain.py +++ b/langfuse/callback/langchain.py @@ -991,14 +991,8 @@ def _log_debug_event( parent_run_id: Optional[UUID] = None, **kwargs, ): - kwargs_log = ( - ", " + ", ".join([f"{key}: {value}" for key, value in kwargs.items()]) - if len(kwargs) > 0 - else "" - ) self.log.debug( f"Event: {event_name}, run_id: {str(run_id)[:5]}, parent_run_id: {str(parent_run_id)[:5]}" - + kwargs_log ) diff --git a/langfuse/client.py b/langfuse/client.py index 0c3c7258..f9e6a238 100644 --- a/langfuse/client.py +++ b/langfuse/client.py @@ -1334,11 +1334,11 @@ def trace( new_body = TraceBody(**new_dict) - self.log.debug(f"Creating trace {new_body}") + self.log.debug(f"Creating trace {_filter_io_from_event_body(new_dict)}") event = { "id": str(uuid.uuid4()), "type": "trace-create", - "body": new_body.dict(exclude_none=True), + "body": new_body, } self.task_manager.add_task( @@ -1503,7 +1503,7 @@ def score( event = { "id": str(uuid.uuid4()), "type": "score-create", - "body": new_body.dict(exclude_none=True), + "body": new_body, } self.task_manager.add_task(event) @@ -1604,17 +1604,16 @@ def span( if trace_id is None: self._generate_trace(new_trace_id, name or new_trace_id) - self.log.debug(f"Creating span {span_body}...") + self.log.debug(f"Creating span {_filter_io_from_event_body(span_body)}...") span_body = CreateSpanBody(**span_body) event = { "id": str(uuid.uuid4()), "type": "span-create", - "body": span_body.dict(exclude_none=True), + "body": span_body, } - self.log.debug(f"Creating span {event}...") self.task_manager.add_task(event) except Exception as e: @@ -1710,10 +1709,12 @@ def event( event = { "id": str(uuid.uuid4()), "type": "event-create", - "body": request.dict(exclude_none=True), + "body": request, } - self.log.debug(f"Creating event {event}...") + self.log.debug( + f"Creating event {_filter_io_from_event_body(event_body)} ..." + ) self.task_manager.add_task(event) except Exception as e: @@ -1835,23 +1836,24 @@ def generation( event = { "id": str(uuid.uuid4()), "type": "trace-create", - "body": request.dict(exclude_none=True), + "body": request, } - self.log.debug(f"Creating trace {event}...") + self.log.debug("Creating trace...") self.task_manager.add_task(event) - self.log.debug(f"Creating generation max {generation_body} {usage}...") + self.log.debug( + f"Creating generation max {_filter_io_from_event_body(generation_body)}..." + ) request = CreateGenerationBody(**generation_body) event = { "id": str(uuid.uuid4()), "type": "generation-create", - "body": request.dict(exclude_none=True), + "body": request, } - self.log.debug(f"Creating top-level generation {event} ...") self.task_manager.add_task(event) except Exception as e: @@ -1877,10 +1879,10 @@ def _generate_trace(self, trace_id: str, name: str): event = { "id": str(uuid.uuid4()), "type": "trace-create", - "body": trace_body.dict(exclude_none=True), + "body": trace_body, } - self.log.debug(f"Creating trace {event}...") + self.log.debug(f"Creating trace {_filter_io_from_event_body(trace_dict)}...") self.task_manager.add_task(event) def join(self): @@ -2087,7 +2089,9 @@ def generation( "body": new_body.dict(exclude_none=True, exclude_unset=False), } - self.log.debug(f"Creating generation {new_body}...") + self.log.debug( + f"Creating generation {_filter_io_from_event_body(generation_body)}..." + ) self.task_manager.add_task(event) except Exception as e: @@ -2165,7 +2169,7 @@ def span( **kwargs, } - self.log.debug(f"Creating span {span_body}...") + self.log.debug(f"Creating span {_filter_io_from_event_body(span_body)}...") new_dict = self._add_state_to_event(span_body) new_body = self._add_default_values(new_dict) @@ -2175,7 +2179,7 @@ def span( event = { "id": str(uuid.uuid4()), "type": "span-create", - "body": event.dict(exclude_none=True), + "body": event, } self.task_manager.add_task(event) @@ -2284,7 +2288,7 @@ def score( event = { "id": str(uuid.uuid4()), "type": "score-create", - "body": request.dict(exclude_none=True), + "body": request, } self.task_manager.add_task(event) @@ -2369,10 +2373,12 @@ def event( event = { "id": str(uuid.uuid4()), "type": "event-create", - "body": request.dict(exclude_none=True), + "body": request, } - self.log.debug(f"Creating event {event}...") + self.log.debug( + f"Creating event {_filter_io_from_event_body(event_body)}..." + ) self.task_manager.add_task(event) except Exception as e: @@ -2497,7 +2503,9 @@ def update( **kwargs, } - self.log.debug(f"Update generation {generation_body}...") + self.log.debug( + f"Update generation {_filter_io_from_event_body(generation_body)}..." + ) request = UpdateGenerationBody(**generation_body) @@ -2507,7 +2515,9 @@ def update( "body": request.dict(exclude_none=True, exclude_unset=False), } - self.log.debug(f"Update generation {event}...") + self.log.debug( + f"Update generation {_filter_io_from_event_body(generation_body)}..." + ) self.task_manager.add_task(event) except Exception as e: @@ -2684,14 +2694,14 @@ def update( "end_time": end_time, **kwargs, } - self.log.debug(f"Update span {span_body}...") + self.log.debug(f"Update span {_filter_io_from_event_body(span_body)}...") request = UpdateSpanBody(**span_body) event = { "id": str(uuid.uuid4()), "type": "span-update", - "body": request.dict(exclude_none=True), + "body": request, } self.task_manager.add_task(event) @@ -2888,14 +2898,14 @@ def update( "tags": tags, **kwargs, } - self.log.debug(f"Update trace {trace_body}...") + self.log.debug(f"Update trace {_filter_io_from_event_body(trace_body)}...") request = TraceBody(**trace_body) event = { "id": str(uuid.uuid4()), "type": "trace-create", - "body": request.dict(exclude_none=True), + "body": request, } self.task_manager.add_task(event) @@ -3350,3 +3360,9 @@ def __init__(self, dataset: Dataset, items: typing.List[DatasetItemClient]): self.created_at = dataset.created_at self.updated_at = dataset.updated_at self.items = items + + +def _filter_io_from_event_body(event_body: Dict[str, Any]): + return { + k: v for k, v in event_body.items() if k not in ("input", "output", "metadata") + } diff --git a/langfuse/task_manager.py b/langfuse/task_manager.py index def8105a..a507785f 100644 --- a/langfuse/task_manager.py +++ b/langfuse/task_manager.py @@ -58,6 +58,8 @@ class Consumer(threading.Thread): _sdk_name: str _sdk_version: str _sdk_integration: str + _mask: Optional[MaskFunction] + _sampler: Sampler def __init__( self, @@ -71,6 +73,8 @@ def __init__( sdk_name: str, sdk_version: str, sdk_integration: str, + sample_rate: float, + mask: Optional[MaskFunction] = None, ): """Create a consumer thread.""" threading.Thread.__init__(self) @@ -91,6 +95,8 @@ def __init__( self._sdk_name = sdk_name self._sdk_version = sdk_version self._sdk_integration = sdk_integration + self._mask = mask + self._sampler = Sampler(sample_rate) def _next(self): """Return the next batch of items to upload.""" @@ -107,13 +113,37 @@ def _next(self): try: item = queue.get(block=True, timeout=self._flush_interval - elapsed) + # convert pydantic models to dicts + if "body" in item and isinstance(item["body"], pydantic.BaseModel): + item["body"] = item["body"].dict(exclude_none=True) + + # sample event + if not self._sampler.sample_event(item): + queue.task_done() + + continue + + # truncate item if it exceeds size limit item_size = self._truncate_item_in_place( item=item, max_size=MAX_MSG_SIZE, log_message="", ) + # apply mask + self._apply_mask_in_place(item) + + # check for serialization errors + try: + json.dumps(item, cls=EventSerializer) + except Exception as e: + self._log.error(f"Error serializing item, skipping: {e}") + queue.task_done() + + continue + items.append(item) + total_size += item_size if total_size >= BATCH_SIZE_LIMIT: self._log.debug("hit batch size limit (size: %d)", total_size) @@ -190,6 +220,20 @@ def _get_item_size(self, item: typing.Any) -> int: """Return the size of the item in bytes.""" return len(json.dumps(item, cls=EventSerializer).encode()) + def _apply_mask_in_place(self, event: dict): + """Apply the mask function to the event. This is done in place.""" + if not self._mask: + return + + body = event["body"] if "body" in event else {} + for key in ("input", "output"): + if key in body: + try: + body[key] = self._mask(data=body[key]) + except Exception as e: + self._log.error(f"Mask function failed with error: {e}") + body[key] = "" + def run(self): """Runs the consumer.""" self._log.debug("consumer is running...") @@ -261,7 +305,7 @@ class TaskManager(object): _sdk_name: str _sdk_version: str _sdk_integration: str - _sampler: Sampler + _sample_rate: float _mask: Optional[MaskFunction] def __init__( @@ -293,7 +337,7 @@ def __init__( self._sdk_version = sdk_version self._sdk_integration = sdk_integration self._enabled = enabled - self._sampler = Sampler(sample_rate) + self._sample_rate = sample_rate self._mask = mask self.init_resources() @@ -314,6 +358,8 @@ def init_resources(self): sdk_name=self._sdk_name, sdk_version=self._sdk_version, sdk_integration=self._sdk_integration, + sample_rate=self._sample_rate, + mask=self._mask, ) consumer.start() self._consumers.append(consumer) @@ -323,12 +369,6 @@ def add_task(self, event: dict): return try: - if not self._sampler.sample_event(event): - return # event was sampled out - - self._apply_mask_in_place(event) - - json.dumps(event, cls=EventSerializer) event["timestamp"] = _get_timestamp() self._queue.put(event, block=False) @@ -340,20 +380,6 @@ def add_task(self, event: dict): return False - def _apply_mask_in_place(self, event: dict): - """Apply the mask function to the event. This is done in place.""" - if not self._mask: - return - - body = event["body"] if "body" in event else {} - for key in ("input", "output"): - if key in body: - try: - body[key] = self._mask(data=body[key]) - except Exception as e: - self._log.error(f"Mask function failed with error: {e}") - body[key] = "" - def flush(self): """Force a flush from the internal queue to the server.""" self._log.debug("flushing queue") diff --git a/tests/test_langchain.py b/tests/test_langchain.py index 97f73b79..9f4f9093 100644 --- a/tests/test_langchain.py +++ b/tests/test_langchain.py @@ -1,5 +1,9 @@ import os -from typing import Any, List, Mapping, Optional +import random +import string +import time + +from typing import Any, List, Mapping, Optional, Dict import pytest from langchain_community.llms.anthropic import Anthropic @@ -14,6 +18,7 @@ ConversationChain, ) from langchain_core.tools import StructuredTool +from langchain_core.runnables.base import RunnableLambda from langchain.chains.openai_functions import create_openai_fn_chain from langchain.chains.summarize import load_summarize_chain from langchain_community.document_loaders import TextLoader @@ -2158,3 +2163,51 @@ class GetWeather(BaseModel): ) assert generation.output is not None + + +def test_langfuse_overhead(): + def _generate_random_dict(n: int, key_length: int = 8) -> Dict[str, Any]: + result = {} + value_generators = [ + lambda: "".join( + random.choices(string.ascii_letters, k=random.randint(3, 15)) + ), + lambda: random.randint(0, 1000), + lambda: round(random.uniform(0, 100), 2), + lambda: [random.randint(0, 100) for _ in range(random.randint(1, 5))], + lambda: random.choice([True, False]), + ] + while len(result) < n: + key = "".join( + random.choices(string.ascii_letters + string.digits, k=key_length) + ) + if key in result: + continue + value = random.choice(value_generators)() + result[key] = value + return result + + # Test performance overhead of langfuse tracing + inputs = _generate_random_dict(10000, 20000) + test_chain = RunnableLambda(lambda x: None) + + start = time.monotonic() + test_chain.invoke(inputs) + duration_without_langfuse = (time.monotonic() - start) * 1000 + + start = time.monotonic() + handler = CallbackHandler() + test_chain.invoke(inputs, config={"callbacks": [handler]}) + duration_with_langfuse = (time.monotonic() - start) * 1000 + + overhead = duration_with_langfuse - duration_without_langfuse + print(f"Langfuse overhead: {overhead}ms") + + assert overhead < 50, f"Langfuse tracing overhead of {overhead}ms exceeds threshold" + + handler.flush() + + duration_full = (time.monotonic() - start) * 1000 + print(f"Full execution took {duration_full}ms") + + assert duration_full > 1000, "Full execution should take longer than 1 second" diff --git a/tests/test_sdk_setup.py b/tests/test_sdk_setup.py index 850edace..cca83929 100644 --- a/tests/test_sdk_setup.py +++ b/tests/test_sdk_setup.py @@ -217,7 +217,7 @@ def test_callback_sampling(): os.environ["LANGFUSE_SAMPLE_RATE"] = "0.2" handler = CallbackHandler() - assert handler.langfuse.task_manager._sampler.sample_rate == 0.2 + assert handler.langfuse.task_manager._sample_rate == 0.2 os.environ.pop("LANGFUSE_SAMPLE_RATE") @@ -436,7 +436,7 @@ def test_openai_configured(httpserver: HTTPServer): assert modifier._langfuse.client._client_wrapper._password == "sk-lf-asdfghjkl" assert modifier._langfuse.client._client_wrapper._base_url == host assert modifier._langfuse.task_manager._client._base_url == host - assert modifier._langfuse.task_manager._sampler.sample_rate == 0.2 + assert modifier._langfuse.task_manager._sample_rate == 0.2 os.environ["LANGFUSE_PUBLIC_KEY"] = public_key os.environ["LANGFUSE_SECRET_KEY"] = secret_key