From c05ab06e4529bf15953715f94bcaf4a616755d90 Mon Sep 17 00:00:00 2001 From: Roger Yang <80478925+RogerHYang@users.noreply.github.com> Date: Mon, 12 Feb 2024 08:53:03 -0800 Subject: [PATCH] fix: stop attaching context for callback based instrumentors (#192) --- .../examples/chain_metadata.py | 2 +- .../examples/openai_chat_stream.py | 2 +- .../pyproject.toml | 1 - .../instrumentation/langchain/_tracer.py | 13 +++--- .../langchain/test_instrumentor.py | 14 ------- .../examples/chroma_query_engine.py | 18 ++++---- .../examples/requirements.txt | 1 - .../pyproject.toml | 2 - .../instrumentation/llama_index/_callback.py | 42 ++++--------------- .../llama_index/test_callback.py | 25 +---------- 10 files changed, 26 insertions(+), 94 deletions(-) diff --git a/python/instrumentation/openinference-instrumentation-langchain/examples/chain_metadata.py b/python/instrumentation/openinference-instrumentation-langchain/examples/chain_metadata.py index 63837a887..43ec2ee99 100644 --- a/python/instrumentation/openinference-instrumentation-langchain/examples/chain_metadata.py +++ b/python/instrumentation/openinference-instrumentation-langchain/examples/chain_metadata.py @@ -10,7 +10,7 @@ endpoint = "http://127.0.0.1:6006/v1/traces" tracer_provider = trace_sdk.TracerProvider() trace_api.set_tracer_provider(tracer_provider) -tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter(endpoint=endpoint))) +tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter(endpoint))) tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter())) LangChainInstrumentor().instrument() diff --git a/python/instrumentation/openinference-instrumentation-langchain/examples/openai_chat_stream.py b/python/instrumentation/openinference-instrumentation-langchain/examples/openai_chat_stream.py index 0f3495124..ed17d5561 100644 --- a/python/instrumentation/openinference-instrumentation-langchain/examples/openai_chat_stream.py +++ b/python/instrumentation/openinference-instrumentation-langchain/examples/openai_chat_stream.py @@ -8,7 +8,7 @@ endpoint = "http://127.0.0.1:6006/v1/traces" tracer_provider = trace_sdk.TracerProvider() trace_api.set_tracer_provider(tracer_provider) -tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter(endpoint=endpoint))) +tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter(endpoint))) tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter())) LangChainInstrumentor().instrument() diff --git a/python/instrumentation/openinference-instrumentation-langchain/pyproject.toml b/python/instrumentation/openinference-instrumentation-langchain/pyproject.toml index 52f5c30da..890ae1c75 100644 --- a/python/instrumentation/openinference-instrumentation-langchain/pyproject.toml +++ b/python/instrumentation/openinference-instrumentation-langchain/pyproject.toml @@ -40,7 +40,6 @@ test = [ "langchain_openai == 0.0.2", "langchain-community == 0.0.10", "opentelemetry-sdk", - "openinference-instrumentation-openai", "respx", ] type-check = [ diff --git a/python/instrumentation/openinference-instrumentation-langchain/src/openinference/instrumentation/langchain/_tracer.py b/python/instrumentation/openinference-instrumentation-langchain/src/openinference/instrumentation/langchain/_tracer.py index 136a1af63..db7d6470f 100644 --- a/python/instrumentation/openinference-instrumentation-langchain/src/openinference/instrumentation/langchain/_tracer.py +++ b/python/instrumentation/openinference-instrumentation-langchain/src/openinference/instrumentation/langchain/_tracer.py @@ -43,7 +43,6 @@ class _Run(NamedTuple): span: trace_api.Span context: context_api.Context - token: object # token for OTEL context API class OpenInferenceTracer(BaseTracer): @@ -66,13 +65,17 @@ def _start_trace(self, run: Run) -> None: else None, ) context = trace_api.set_span_in_context(span) - token = context_api.attach(context) - self._runs[run.id] = _Run(span=span, context=context, token=token) + # The following line of code is commented out to serve as a reminder that in a system + # of callbacks, attaching the context can be hazardous because there is no guarantee + # that the context will be detached. An error could happen between callbacks leaving + # the context attached forever, and all future spans will use it as parent. What's + # worse is that the error could have also prevented the span from being exported, + # leaving all future spans as orphans. That is a very bad scenario. + # token = context_api.attach(context) + self._runs[run.id] = _Run(span=span, context=context) def _end_trace(self, run: Run) -> None: if event_data := self._runs.pop(run.id, None): - # FIXME: find out why sometimes token fails to detach, e.g. when it's async - context_api.detach(event_data.token) span = event_data.span try: _update_span(span, run) diff --git a/python/instrumentation/openinference-instrumentation-langchain/tests/openinference/instrumentation/langchain/test_instrumentor.py b/python/instrumentation/openinference-instrumentation-langchain/tests/openinference/instrumentation/langchain/test_instrumentor.py index c4308ba3b..329da77fb 100644 --- a/python/instrumentation/openinference-instrumentation-langchain/tests/openinference/instrumentation/langchain/test_instrumentor.py +++ b/python/instrumentation/openinference-instrumentation-langchain/tests/openinference/instrumentation/langchain/test_instrumentor.py @@ -16,7 +16,6 @@ from langchain_core.prompts import PromptTemplate from langchain_openai import ChatOpenAI from openinference.instrumentation.langchain import LangChainInstrumentor -from openinference.instrumentation.openai import OpenAIInstrumentor from openinference.semconv.trace import ( DocumentAttributes, EmbeddingAttributes, @@ -218,17 +217,6 @@ def test_callback_llm( ) == "BadRequestError" assert oai_attributes == {} - # The remaining span is from the openai instrumentor. - openai_span = spans_by_name.popitem()[1] - assert openai_span.parent is not None - if is_async: - # FIXME: it's unclear why the context fails to propagate. - assert openai_span.parent.span_id == llm_span.context.span_id - assert openai_span.context.trace_id == llm_span.context.trace_id - else: - assert openai_span.parent.span_id == oai_span.context.span_id - assert openai_span.context.trace_id == oai_span.context.trace_id - assert spans_by_name == {} @@ -301,9 +289,7 @@ def instrument( in_memory_span_exporter: InMemorySpanExporter, ) -> Generator[None, None, None]: LangChainInstrumentor().instrument(tracer_provider=tracer_provider) - OpenAIInstrumentor().instrument(tracer_provider=tracer_provider) yield - OpenAIInstrumentor().uninstrument() LangChainInstrumentor().uninstrument() in_memory_span_exporter.clear() diff --git a/python/instrumentation/openinference-instrumentation-llama-index/examples/chroma_query_engine.py b/python/instrumentation/openinference-instrumentation-llama-index/examples/chroma_query_engine.py index ebcfe7aee..8f2ccd86d 100644 --- a/python/instrumentation/openinference-instrumentation-llama-index/examples/chroma_query_engine.py +++ b/python/instrumentation/openinference-instrumentation-llama-index/examples/chroma_query_engine.py @@ -3,28 +3,24 @@ from llama_index.storage.storage_context import StorageContext from llama_index.vector_stores import ChromaVectorStore from openinference.instrumentation.llama_index import LlamaIndexInstrumentor -from openinference.instrumentation.openai import OpenAIInstrumentor from opentelemetry import trace as trace_api from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk import trace as trace_sdk -from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor -resource = Resource(attributes={}) -tracer_provider = trace_sdk.TracerProvider(resource=resource) -span_exporter = OTLPSpanExporter(endpoint="http://127.0.0.1:6006/v1/traces") -span_processor = SimpleSpanProcessor(span_exporter=span_exporter) -tracer_provider.add_span_processor(span_processor=span_processor) -trace_api.set_tracer_provider(tracer_provider=tracer_provider) +endpoint = "http://127.0.0.1:6006/v1/traces" +tracer_provider = trace_sdk.TracerProvider() +trace_api.set_tracer_provider(tracer_provider) +tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter(endpoint))) +tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter())) LlamaIndexInstrumentor().instrument() -OpenAIInstrumentor().instrument() chroma_client = chromadb.EphemeralClient() chroma_collection = chroma_client.create_collection("essays") documents = SimpleDirectoryReader("./data/paul_graham/").load_data() -vector_store = ChromaVectorStore(chroma_collection=chroma_collection) +vector_store = ChromaVectorStore(chroma_collection) storage_context = StorageContext.from_defaults(vector_store=vector_store) index = VectorStoreIndex.from_documents(documents, storage_context=storage_context) diff --git a/python/instrumentation/openinference-instrumentation-llama-index/examples/requirements.txt b/python/instrumentation/openinference-instrumentation-llama-index/examples/requirements.txt index d422a41fd..a82533a10 100644 --- a/python/instrumentation/openinference-instrumentation-llama-index/examples/requirements.txt +++ b/python/instrumentation/openinference-instrumentation-llama-index/examples/requirements.txt @@ -2,5 +2,4 @@ llama-index >= 0.9.14 openinference-instrumentation-llama-index opentelemetry-sdk opentelemetry-exporter-otlp -openinference-instrumentation-openai chromadb diff --git a/python/instrumentation/openinference-instrumentation-llama-index/pyproject.toml b/python/instrumentation/openinference-instrumentation-llama-index/pyproject.toml index c54b098ea..ad1697d08 100644 --- a/python/instrumentation/openinference-instrumentation-llama-index/pyproject.toml +++ b/python/instrumentation/openinference-instrumentation-llama-index/pyproject.toml @@ -37,9 +37,7 @@ instruments = [ ] test = [ "llama-index == 0.9.14", - "openai", "opentelemetry-sdk", - "openinference-instrumentation-openai", "respx", ] diff --git a/python/instrumentation/openinference-instrumentation-llama-index/src/openinference/instrumentation/llama_index/_callback.py b/python/instrumentation/openinference-instrumentation-llama-index/src/openinference/instrumentation/llama_index/_callback.py index 75c8ed6f5..5111c8ece 100644 --- a/python/instrumentation/openinference-instrumentation-llama-index/src/openinference/instrumentation/llama_index/_callback.py +++ b/python/instrumentation/openinference-instrumentation-llama-index/src/openinference/instrumentation/llama_index/_callback.py @@ -56,7 +56,6 @@ @dataclass class _EventData: span: trace_api.Span - token: object parent_id: str context: Optional[context_api.Context] payloads: List[Dict[str, Any]] @@ -193,7 +192,6 @@ def _extract_invocation_parameters(serialized: Mapping[str, Any]) -> Dict[str, A class OpenInferenceTraceCallbackHandler(BaseCallbackHandler): __slots__ = ( "_tracer", - "_context_api_token_stack", "_event_data", "_stragglers", ) @@ -201,7 +199,6 @@ class OpenInferenceTraceCallbackHandler(BaseCallbackHandler): def __init__(self, tracer: trace_api.Tracer) -> None: super().__init__(event_starts_to_ignore=[], event_ends_to_ignore=[]) self._tracer = tracer - self._context_api_token_stack = _ContextApiTokenStack() self._event_data: Dict[_EventId, _EventData] = {} self._stragglers: Dict[_EventId, _EventData] = _Stragglers(capacity=1000) """ @@ -259,11 +256,15 @@ def on_event_start( ) span.set_attribute(OPENINFERENCE_SPAN_KIND, _get_span_kind(event_type).value) new_context = trace_api.set_span_in_context(span) - token = context_api.attach(new_context) - self._context_api_token_stack.append(token) + # The following line of code is commented out to serve as a reminder that in a system + # of callbacks, attaching the context can be hazardous because there is no guarantee + # that the context will be detached. An error could happen between callbacks leaving + # the context attached forever, and all future spans will use it as parent. What's + # worse is that the error could have also prevented the span from being exported, + # leaving all future spans as orphans. That is a very bad scenario. + # token = context_api.attach(new_context) self._event_data[event_id] = _EventData( span=span, - token=token, parent_id=parent_id, context=new_context, start_time=start_time, @@ -295,7 +296,6 @@ def on_event_end( return event_data.end_time = time_ns() - self._context_api_token_stack.pop(event_data.token) if payload is not None: event_data.payloads.append(payload.copy()) @@ -366,34 +366,6 @@ def end_trace(self, trace_id: Optional[str] = None, *args: Any, **kwargs: Any) - dfs_stack.extend(adjacency_list[event_id]) -class _ContextApiTokenStack: - """ - Allows popping a token from the middle of the stack, whereby all tokens above - it are also popped. Popping a non-existent token is allowed and has no effect. - """ - - __slots__ = ("_stack", "_positions") - - def __init__(self) -> None: - self._stack: List[object] = [] - self._positions: Dict[int, int] = {} - - def append(self, token: object) -> None: - if id(token) in self._positions: - return - position = len(self._stack) - self._positions[id(token)] = position - self._stack.append(token) - - def pop(self, token: object) -> None: - if (position := self._positions.get(id(token))) is None: - return - for token in reversed(self._stack[position:]): - self._positions.pop(id(token)) - context_api.detach(token) - self._stack = self._stack[:position] - - class _Stragglers(OrderedDict[_EventId, _EventData]): """ Stragglers are events that have not ended before their traces have ended. If an event is in diff --git a/python/instrumentation/openinference-instrumentation-llama-index/tests/openinference/instrumentation/llama_index/test_callback.py b/python/instrumentation/openinference-instrumentation-llama-index/tests/openinference/instrumentation/llama_index/test_callback.py index 641fc9a4a..b3d30ffee 100644 --- a/python/instrumentation/openinference-instrumentation-llama-index/tests/openinference/instrumentation/llama_index/test_callback.py +++ b/python/instrumentation/openinference-instrumentation-llama-index/tests/openinference/instrumentation/llama_index/test_callback.py @@ -25,7 +25,6 @@ from llama_index.response.schema import StreamingResponse from llama_index.schema import Document, TextNode from openinference.instrumentation.llama_index import LlamaIndexInstrumentor -from openinference.instrumentation.openai import OpenAIInstrumentor from openinference.semconv.trace import ( DocumentAttributes, EmbeddingAttributes, @@ -36,7 +35,6 @@ ) from opentelemetry import trace as trace_api from opentelemetry.sdk import trace as trace_sdk -from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter from respx import MockRouter @@ -220,21 +218,6 @@ async def task() -> None: assert chunking_attributes.pop(OPENINFERENCE_SPAN_KIND, None) is not None assert chunking_attributes == {} - # The remaining span is from the openai instrumentator. - openai_span = spans_by_name.pop(next(iter(spans_by_name.keys()))) - if is_stream: - # FIXME: for streaming, the openai call happens after the trace has ended, so it's orphaned. - # https://github.com/run-llama/llama_index/blob/fcfab6486bc6a0eec31a983dd3056ef9cbe8ceb2/llama_index/llms/base.py#L62 # noqa E501 - assert openai_span.parent is None - else: - assert openai_span.parent is not None - if status_code == 200: - # FIXME: LlamaIndex doesn't currently capture the LLM span when status_code == 400 - # because `on_event_end` never gets called. - assert llm_span is not None - assert openai_span.parent.span_id == llm_span.context.span_id - assert openai_span.context.trace_id == llm_span.context.trace_id - assert spans_by_name == {} @@ -267,10 +250,8 @@ def in_memory_span_exporter() -> InMemorySpanExporter: @pytest.fixture(scope="module") def tracer_provider(in_memory_span_exporter: InMemorySpanExporter) -> trace_api.TracerProvider: - resource = Resource(attributes={}) - tracer_provider = trace_sdk.TracerProvider(resource=resource) - span_processor = SimpleSpanProcessor(span_exporter=in_memory_span_exporter) - tracer_provider.add_span_processor(span_processor=span_processor) + tracer_provider = trace_sdk.TracerProvider() + tracer_provider.add_span_processor(SimpleSpanProcessor(in_memory_span_exporter)) return tracer_provider @@ -280,9 +261,7 @@ def instrument( in_memory_span_exporter: InMemorySpanExporter, ) -> Generator[None, None, None]: LlamaIndexInstrumentor().instrument(tracer_provider=tracer_provider) - OpenAIInstrumentor().instrument(tracer_provider=tracer_provider) yield - OpenAIInstrumentor().uninstrument() LlamaIndexInstrumentor().uninstrument() in_memory_span_exporter.clear()