Skip to content

Commit

Permalink
Stateful tracing, revisted
Browse files Browse the repository at this point in the history
  • Loading branch information
rossabaker committed Feb 3, 2023
1 parent 75e4466 commit e3ea876
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 55 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ lazy val examples = project
.settings(
name := "otel4s-examples",
libraryDependencies ++= Seq(
"co.fs2" %% "fs2-core" % FS2Version,
"io.opentelemetry" % "opentelemetry-exporter-otlp" % OpenTelemetryVersion,
"io.opentelemetry" % "opentelemetry-sdk" % OpenTelemetryVersion,
"io.opentelemetry" % "opentelemetry-sdk-extension-autoconfigure" % s"${OpenTelemetryVersion}-alpha"
Expand Down
28 changes: 24 additions & 4 deletions examples/src/main/scala/TracingExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,25 @@ import cats.effect.MonadCancelThrow
import cats.effect.Resource
import cats.effect.std.Console
import cats.syntax.all._
import fs2.Stream
import io.opentelemetry.api.GlobalOpenTelemetry
import org.typelevel.otel4s.Attribute
import org.typelevel.otel4s.java.OtelJava
import org.typelevel.otel4s.trace.Tracer

import scala.concurrent.duration._

trait Work[F[_]] {
def doWork: F[Unit]
def doWork(i: Int): F[Unit]
}

object Work {
def apply[F[_]: MonadCancelThrow: Tracer: Console]: Work[F] =
new Work[F] {
def doWork: F[Unit] =
def doWork(i: Int): F[Unit] =
Tracer[F].span("Work.DoWork").use { span =>
span.addEvent("Starting the work.") *>
span.addAttribute(Attribute("number", i.toLong)) *>
span.addEvent("Starting the work.") *>
doWorkInternal *>
span.addEvent("Finished working.")
}
Expand All @@ -52,7 +57,22 @@ object TracingExample extends IOApp.Simple {

def run: IO[Unit] = {
tracerResource.use { implicit tracer: Tracer[IO] =>
Work[IO].doWork
val resource: Resource[IO, Unit] =
Resource.make(IO.sleep(50.millis))(_ => IO.sleep(100.millis))

def stream(name: String) =
Stream
.resource(tracer.spanBuilder(name).start >> resource)
.flatMap(_ => Stream(1, 2, 3))
.evalMap(Work[IO].doWork)

tracer
.span("root")
.surround(
(stream("uninterrupted") ++ stream(
"interrupted"
).interruptScope).compile.drain
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package org.typelevel.otel4s.java.trace

import cats.effect.IOLocal
import cats.effect.LiftIO
import cats.effect.Ref
import cats.effect.Resource
import cats.effect.Sync
import cats.syntax.flatMap._
import cats.syntax.functor._
import io.opentelemetry.api.trace.{Span => JSpan}
import io.opentelemetry.context.{Context => JContext}
Expand Down Expand Up @@ -52,60 +54,64 @@ private[java] object TraceScope {
): F[TraceScope[F]] = {
val scopeRoot = Scope.Root(default)

IOLocal[Scope](scopeRoot).to[F].map { local =>
new TraceScope[F] {
val root: F[Scope.Root] =
Sync[F].pure(scopeRoot)

def current: F[Scope] =
local.get.to[F]

def makeScope(span: JSpan): Resource[F, Unit] =
for {
current <- Resource.eval(current)
_ <- createScope(nextScope(current, span))
} yield ()

def rootScope: Resource[F, Unit] =
Resource.eval(current).flatMap {
case Scope.Root(_) =>
createScope(scopeRoot)

case Scope.Span(_, _, _) =>
createScope(scopeRoot)

case Scope.Noop =>
createScope(Scope.Noop)
}

def noopScope: Resource[F, Unit] =
createScope(Scope.Noop)

private def createScope(scope: Scope): Resource[F, Unit] =
Resource
.make(local.getAndSet(scope).to[F])(p => local.set(p).to[F])
.void

private def nextScope(scope: Scope, span: JSpan): Scope =
scope match {
case Scope.Root(ctx) =>
Scope.Span(
ctx.`with`(span),
span,
WrappedSpanContext(span.getSpanContext)
)
Ref.of[F, Scope](scopeRoot).flatMap { ref =>
IOLocal[Ref[F, Scope]](ref).to[F].map { local =>
new TraceScope[F] {
val root: F[Scope.Root] =
Sync[F].pure(scopeRoot)

case Scope.Span(ctx, _, _) =>
Scope.Span(
ctx.`with`(span),
span,
WrappedSpanContext(span.getSpanContext)
)
def current: F[Scope] =
local.get.to[F].flatMap(_.get)

def makeScope(span: JSpan): Resource[F, Unit] =
for {
current <- Resource.eval(current)
_ <- createScope(nextScope(current, span))
} yield ()

def rootScope: Resource[F, Unit] =
Resource.eval(current).flatMap {
case Scope.Root(_) =>
createScope(scopeRoot)

case Scope.Noop =>
Scope.Noop
}
case Scope.Span(_, _, _) =>
createScope(scopeRoot)

case Scope.Noop =>
createScope(Scope.Noop)
}

def noopScope: Resource[F, Unit] =
createScope(Scope.Noop)

private def createScope(scope: Scope): Resource[F, Unit] =
Resource
.make(local.get.to[F].flatMap(_.getAndSet(scope)))(p =>
local.get.to[F].flatMap(_.set(p))
)
.void

private def nextScope(scope: Scope, span: JSpan): Scope =
scope match {
case Scope.Root(ctx) =>
Scope.Span(
ctx.`with`(span),
span,
WrappedSpanContext(span.getSpanContext)
)

case Scope.Span(ctx, _, _) =>
Scope.Span(
ctx.`with`(span),
span,
WrappedSpanContext(span.getSpanContext)
)

case Scope.Noop =>
Scope.Noop
}

}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,51 @@ class TracerSuite extends CatsEffectSuite {
}
}

test("propagate trace info over interrupted stream scopes") {
def expected(now: FiniteDuration) =
SpanNode(
"span",
now,
now,
List(
SpanNode("span-3", now, now, Nil),
SpanNode("span-2", now, now, Nil)
)
)

def flow(tracer: Tracer[IO]): Stream[IO, Unit] =
for {
span <- Stream.resource(tracer.span("span"))
_ <- Stream.eval(
tracer.currentSpanContext.assertEquals(Some(span.context))
)
span2 <- Stream.resource(tracer.span("span-2"))
_ <- Stream.eval(
tracer.currentSpanContext.assertEquals(Some(span2.context))
)
span3 <- Stream.resource(
tracer.spanBuilder("span-3").withParent(span.context).start
)
_ <- Stream.eval(
tracer.currentSpanContext.assertEquals(Some(span3.context))
)
} yield ()

TestControl.executeEmbed {
for {
now <- IO.monotonic.delayBy(1.second) // otherwise returns 0
sdk <- makeSdk()
tracer <- sdk.provider.tracer("tracer").get
_ <- tracer.currentSpanContext.assertEquals(None)
_ <- flow(tracer).interruptScope.compile.drain
_ <- tracer.currentSpanContext.assertEquals(None)
spans <- sdk.finishedSpans
tree <- IO.pure(SpanNode.fromSpans(spans))
// _ <- IO.println(tree.map(SpanNode.render).mkString("\n"))
} yield assertEquals(tree, List(expected(now)))
}
}

private def assertIdsNotEqual(s1: Span[IO], s2: Span[IO]): Unit = {
assertNotEquals(s1.context.traceIdHex, s2.context.traceIdHex)
assertNotEquals(s1.context.spanIdHex, s2.context.spanIdHex)
Expand Down

0 comments on commit e3ea876

Please sign in to comment.