Skip to content

Commit

Permalink
fix: stop attaching context for callback based instrumentors (#192)
Browse files Browse the repository at this point in the history
  • Loading branch information
RogerHYang authored Feb 12, 2024
1 parent 1d2cc04 commit c05ab06
Show file tree
Hide file tree
Showing 10 changed files with 26 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
endpoint = "http://127.0.0.1:6006/v1/traces"
tracer_provider = trace_sdk.TracerProvider()
trace_api.set_tracer_provider(tracer_provider)
tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter(endpoint=endpoint)))
tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter(endpoint)))
tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))

LangChainInstrumentor().instrument()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
endpoint = "http://127.0.0.1:6006/v1/traces"
tracer_provider = trace_sdk.TracerProvider()
trace_api.set_tracer_provider(tracer_provider)
tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter(endpoint=endpoint)))
tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter(endpoint)))
tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))

LangChainInstrumentor().instrument()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ test = [
"langchain_openai == 0.0.2",
"langchain-community == 0.0.10",
"opentelemetry-sdk",
"openinference-instrumentation-openai",
"respx",
]
type-check = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
class _Run(NamedTuple):
span: trace_api.Span
context: context_api.Context
token: object # token for OTEL context API


class OpenInferenceTracer(BaseTracer):
Expand All @@ -66,13 +65,17 @@ def _start_trace(self, run: Run) -> None:
else None,
)
context = trace_api.set_span_in_context(span)
token = context_api.attach(context)
self._runs[run.id] = _Run(span=span, context=context, token=token)
# The following line of code is commented out to serve as a reminder that in a system
# of callbacks, attaching the context can be hazardous because there is no guarantee
# that the context will be detached. An error could happen between callbacks leaving
# the context attached forever, and all future spans will use it as parent. What's
# worse is that the error could have also prevented the span from being exported,
# leaving all future spans as orphans. That is a very bad scenario.
# token = context_api.attach(context)
self._runs[run.id] = _Run(span=span, context=context)

def _end_trace(self, run: Run) -> None:
if event_data := self._runs.pop(run.id, None):
# FIXME: find out why sometimes token fails to detach, e.g. when it's async
context_api.detach(event_data.token)
span = event_data.span
try:
_update_span(span, run)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from openinference.instrumentation.langchain import LangChainInstrumentor
from openinference.instrumentation.openai import OpenAIInstrumentor
from openinference.semconv.trace import (
DocumentAttributes,
EmbeddingAttributes,
Expand Down Expand Up @@ -218,17 +217,6 @@ def test_callback_llm(
) == "BadRequestError"
assert oai_attributes == {}

# The remaining span is from the openai instrumentor.
openai_span = spans_by_name.popitem()[1]
assert openai_span.parent is not None
if is_async:
# FIXME: it's unclear why the context fails to propagate.
assert openai_span.parent.span_id == llm_span.context.span_id
assert openai_span.context.trace_id == llm_span.context.trace_id
else:
assert openai_span.parent.span_id == oai_span.context.span_id
assert openai_span.context.trace_id == oai_span.context.trace_id

assert spans_by_name == {}


Expand Down Expand Up @@ -301,9 +289,7 @@ def instrument(
in_memory_span_exporter: InMemorySpanExporter,
) -> Generator[None, None, None]:
LangChainInstrumentor().instrument(tracer_provider=tracer_provider)
OpenAIInstrumentor().instrument(tracer_provider=tracer_provider)
yield
OpenAIInstrumentor().uninstrument()
LangChainInstrumentor().uninstrument()
in_memory_span_exporter.clear()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,24 @@
from llama_index.storage.storage_context import StorageContext
from llama_index.vector_stores import ChromaVectorStore
from openinference.instrumentation.llama_index import LlamaIndexInstrumentor
from openinference.instrumentation.openai import OpenAIInstrumentor
from opentelemetry import trace as trace_api
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor

resource = Resource(attributes={})
tracer_provider = trace_sdk.TracerProvider(resource=resource)
span_exporter = OTLPSpanExporter(endpoint="http://127.0.0.1:6006/v1/traces")
span_processor = SimpleSpanProcessor(span_exporter=span_exporter)
tracer_provider.add_span_processor(span_processor=span_processor)
trace_api.set_tracer_provider(tracer_provider=tracer_provider)
endpoint = "http://127.0.0.1:6006/v1/traces"
tracer_provider = trace_sdk.TracerProvider()
trace_api.set_tracer_provider(tracer_provider)
tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter(endpoint)))
tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))

LlamaIndexInstrumentor().instrument()
OpenAIInstrumentor().instrument()

chroma_client = chromadb.EphemeralClient()
chroma_collection = chroma_client.create_collection("essays")

documents = SimpleDirectoryReader("./data/paul_graham/").load_data()
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
vector_store = ChromaVectorStore(chroma_collection)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(documents, storage_context=storage_context)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,4 @@ llama-index >= 0.9.14
openinference-instrumentation-llama-index
opentelemetry-sdk
opentelemetry-exporter-otlp
openinference-instrumentation-openai
chromadb
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ instruments = [
]
test = [
"llama-index == 0.9.14",
"openai",
"opentelemetry-sdk",
"openinference-instrumentation-openai",
"respx",
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
@dataclass
class _EventData:
span: trace_api.Span
token: object
parent_id: str
context: Optional[context_api.Context]
payloads: List[Dict[str, Any]]
Expand Down Expand Up @@ -193,15 +192,13 @@ def _extract_invocation_parameters(serialized: Mapping[str, Any]) -> Dict[str, A
class OpenInferenceTraceCallbackHandler(BaseCallbackHandler):
__slots__ = (
"_tracer",
"_context_api_token_stack",
"_event_data",
"_stragglers",
)

def __init__(self, tracer: trace_api.Tracer) -> None:
super().__init__(event_starts_to_ignore=[], event_ends_to_ignore=[])
self._tracer = tracer
self._context_api_token_stack = _ContextApiTokenStack()
self._event_data: Dict[_EventId, _EventData] = {}
self._stragglers: Dict[_EventId, _EventData] = _Stragglers(capacity=1000)
"""
Expand Down Expand Up @@ -259,11 +256,15 @@ def on_event_start(
)
span.set_attribute(OPENINFERENCE_SPAN_KIND, _get_span_kind(event_type).value)
new_context = trace_api.set_span_in_context(span)
token = context_api.attach(new_context)
self._context_api_token_stack.append(token)
# The following line of code is commented out to serve as a reminder that in a system
# of callbacks, attaching the context can be hazardous because there is no guarantee
# that the context will be detached. An error could happen between callbacks leaving
# the context attached forever, and all future spans will use it as parent. What's
# worse is that the error could have also prevented the span from being exported,
# leaving all future spans as orphans. That is a very bad scenario.
# token = context_api.attach(new_context)
self._event_data[event_id] = _EventData(
span=span,
token=token,
parent_id=parent_id,
context=new_context,
start_time=start_time,
Expand Down Expand Up @@ -295,7 +296,6 @@ def on_event_end(
return

event_data.end_time = time_ns()
self._context_api_token_stack.pop(event_data.token)

if payload is not None:
event_data.payloads.append(payload.copy())
Expand Down Expand Up @@ -366,34 +366,6 @@ def end_trace(self, trace_id: Optional[str] = None, *args: Any, **kwargs: Any) -
dfs_stack.extend(adjacency_list[event_id])


class _ContextApiTokenStack:
"""
Allows popping a token from the middle of the stack, whereby all tokens above
it are also popped. Popping a non-existent token is allowed and has no effect.
"""

__slots__ = ("_stack", "_positions")

def __init__(self) -> None:
self._stack: List[object] = []
self._positions: Dict[int, int] = {}

def append(self, token: object) -> None:
if id(token) in self._positions:
return
position = len(self._stack)
self._positions[id(token)] = position
self._stack.append(token)

def pop(self, token: object) -> None:
if (position := self._positions.get(id(token))) is None:
return
for token in reversed(self._stack[position:]):
self._positions.pop(id(token))
context_api.detach(token)
self._stack = self._stack[:position]


class _Stragglers(OrderedDict[_EventId, _EventData]):
"""
Stragglers are events that have not ended before their traces have ended. If an event is in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from llama_index.response.schema import StreamingResponse
from llama_index.schema import Document, TextNode
from openinference.instrumentation.llama_index import LlamaIndexInstrumentor
from openinference.instrumentation.openai import OpenAIInstrumentor
from openinference.semconv.trace import (
DocumentAttributes,
EmbeddingAttributes,
Expand All @@ -36,7 +35,6 @@
)
from opentelemetry import trace as trace_api
from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
from respx import MockRouter
Expand Down Expand Up @@ -220,21 +218,6 @@ async def task() -> None:
assert chunking_attributes.pop(OPENINFERENCE_SPAN_KIND, None) is not None
assert chunking_attributes == {}

# The remaining span is from the openai instrumentator.
openai_span = spans_by_name.pop(next(iter(spans_by_name.keys())))
if is_stream:
# FIXME: for streaming, the openai call happens after the trace has ended, so it's orphaned.
# https://github.com/run-llama/llama_index/blob/fcfab6486bc6a0eec31a983dd3056ef9cbe8ceb2/llama_index/llms/base.py#L62 # noqa E501
assert openai_span.parent is None
else:
assert openai_span.parent is not None
if status_code == 200:
# FIXME: LlamaIndex doesn't currently capture the LLM span when status_code == 400
# because `on_event_end` never gets called.
assert llm_span is not None
assert openai_span.parent.span_id == llm_span.context.span_id
assert openai_span.context.trace_id == llm_span.context.trace_id

assert spans_by_name == {}


Expand Down Expand Up @@ -267,10 +250,8 @@ def in_memory_span_exporter() -> InMemorySpanExporter:

@pytest.fixture(scope="module")
def tracer_provider(in_memory_span_exporter: InMemorySpanExporter) -> trace_api.TracerProvider:
resource = Resource(attributes={})
tracer_provider = trace_sdk.TracerProvider(resource=resource)
span_processor = SimpleSpanProcessor(span_exporter=in_memory_span_exporter)
tracer_provider.add_span_processor(span_processor=span_processor)
tracer_provider = trace_sdk.TracerProvider()
tracer_provider.add_span_processor(SimpleSpanProcessor(in_memory_span_exporter))
return tracer_provider


Expand All @@ -280,9 +261,7 @@ def instrument(
in_memory_span_exporter: InMemorySpanExporter,
) -> Generator[None, None, None]:
LlamaIndexInstrumentor().instrument(tracer_provider=tracer_provider)
OpenAIInstrumentor().instrument(tracer_provider=tracer_provider)
yield
OpenAIInstrumentor().uninstrument()
LlamaIndexInstrumentor().uninstrument()
in_memory_span_exporter.clear()

Expand Down

0 comments on commit c05ab06

Please sign in to comment.