-
Notifications
You must be signed in to change notification settings - Fork 714
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kotlin coroutine context propagation #820
Comments
We have this problem. Is there any workaround for this? |
We are using this library from |
cool. you can do a variant of this if you want https://github.com/openzipkin/brave-webmvc-example |
I tried but I kept getting Since I'm not familiar with this project, I could not fix it. Let me create spring project with spring initializer. https://start.spring.io/ |
https://github.com/shiraji/brave-coroutine Here is the coroutine code with |
ok thanks so in this case you want the code in the GlobalScope.launch to
have the same span ID I guess?
|
Yes. But since it is different thread, span id is different. |
some other related info on coroutine for who addresses this square/retrofit#2886 For the specific use case, we need to figure out any hooks possible for GlobalScope.launch, such that doing launch is the same as explicitly saving off currentContext.get() and re-applying it with currentContext.newScope before the work is run |
The cleanest way to handle this would be with ThreadContextElement. However, I’m currently unsure of how to apply this against Brave. Mostly around how to close() the scope returned by newScope when implementing this interface. Further information is available at An implementation for MDC is available at |
I will try this today |
example which breaks on /async endpoint @SpringBootApplication
@Controller
open class Demo @Autowired constructor(val ctx: CurrentTraceContext) {
@GetMapping("/")
fun sync(): ResponseEntity<String> {
return ResponseEntity.ok(ctx.get().traceIdString());
}
@GetMapping("/async")
fun async() = runBlocking {
GlobalScope.async {
ResponseEntity.ok(ctx.get().traceIdString());
}.await()
}
}
fun main(args: Array<String>) {
runApplication<Demo>(*args)
} |
I've added the @GetMapping("/async")
fun async() = runBlocking {
- GlobalScope.async {
+ GlobalScope.async(TracingContextElement()) {
ResponseEntity.ok(ctx.get().traceIdString())
}.await()
}
public typealias TraceContextState = TraceContext?
class TracingContextElement(
public val contextState: TraceContextState = Tracing.current().currentTraceContext().get()
) : ThreadContextElement<TraceContextState>, AbstractCoroutineContextElement(Key) {
companion object Key : CoroutineContext.Key<TracingContextElement>
override fun updateThreadContext(context: CoroutineContext): TraceContextState {
val oldState = Tracing.current().currentTraceContext().get()
setCurrent(contextState)
return oldState
}
/** @suppress */
override fun restoreThreadContext(context: CoroutineContext, oldState: TraceContextState) {
setCurrent(oldState)
}
/** @suppress */
private fun setCurrent(contextState: TraceContextState) {
if (contextState == null) {
// TODO?
// we should do something?
} else {
Tracing.current().currentTraceContext().newScope(contextState)
}
}
} |
@ackintosh thank you .. I started thinking this, too. the problem is that this assumes a thread local is behind the impl, which is almost always the case, but might not be. I'm not saying this isn't the end result, but have been thinking about the problem more. Maybe there's a way to hook more directly in to cause less abstraction break (ex scope is the abstraction broken as scope open should always close) |
@ackintosh do you know some way to globally register TracingContextElement so people don't need to add this to their code explicitly? |
Hmm... I'm sorry I don't know that. 🤔 |
very nice link
|
I think this is the way to do it.. can you please verify? import brave.propagation.CurrentTraceContext
import brave.propagation.CurrentTraceContext.Scope
import brave.propagation.TraceContext
import kotlinx.coroutines.ThreadContextElement
import kotlin.coroutines.AbstractCoroutineContextElement
import kotlin.coroutines.CoroutineContext
class TracingContextElement(
val currentTraceContext: CurrentTraceContext,
val initial: TraceContext = currentTraceContext.get()
) : ThreadContextElement<Scope>, AbstractCoroutineContextElement(Key) {
companion object Key : CoroutineContext.Key<TracingContextElement>
override fun updateThreadContext(context: CoroutineContext): Scope {
return currentTraceContext.maybeScope(initial);
}
override fun restoreThreadContext(context: CoroutineContext, scope: Scope) {
scope.close()
}
} |
@adriancole Thank you! I've tested the Sample application
package io.github.ackintosh.sleuthtest.controller
import brave.Tracing
import brave.propagation.CurrentTraceContext
import io.github.ackintosh.sleuthtest.TracingContextElement
import kotlinx.coroutines.*
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Controller
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.reactive.function.client.WebClient
@Controller
class GreetingController(val builder: WebClient.Builder, val ctx: CurrentTraceContext) {
val logger = LoggerFactory.getLogger(GreetingController::class.java)
val client = builder.build()
fun sendRequest() {
logger.info("------> WebClient#get")
client.get()
.uri("http://127.0.0.1:9999/")
.retrieve()
.bodyToMono(String::class.java)
.subscribe { logger.info("(response)") }
logger.info("<------ WebClient#get")
}
@GetMapping("/hello")
fun hello() = runBlocking {
logger.info("<Start>")
val deferred1 = GlobalScope.async(TracingContextElement(Tracing.current().currentTraceContext())) {
logger.info("---> sendRquest (1)")
sendRequest()
logger.info("<--- sendRquest (1)")
}
val deferred2 = GlobalScope.async(TracingContextElement(Tracing.current().currentTraceContext())) {
logger.info("---> sendRquest (2)")
sendRequest()
logger.info("<--- sendRquest (2)")
}
deferred1.await()
deferred2.await()
logger.info("<End>")
}
} Logs
|
@ackintosh can you use a different http client? I want to make sure there's no bug in the |
ps use |
(Updated) @adriancole I've checked again with Fuel.
Diffs from the sample application fun sendRequest() {
logger.info("------> WebClient#get")
+ // client.get()
+ // .uri("http://127.0.0.1:9999/")
+ // .retrieve()
+ // .bodyToMono(String::class.java)
+ // .subscribe { logger.info("(response)") }
+ Fuel.get("http://127.0.0.1:9999/").responseString { request, response, result ->
+ when (result) {
+ is Result.Failure -> {
+ val ex = result.getException()
+ logger.info("(response)")
+ }
+ is Result.Success -> {
+ val data = result.get()
+ logger.info("(response)")
+ }
+ }
+ }
logger.info("<------ WebClient#get")
} (Used Logs
The last two are weird. Missed both of TraceID and SpanID. 🤔
|
Arf, sorry the test above was insufficient... I'll update the comment. |
Updated #820 (comment) |
could possibly be a bug. worth trying to isolate into a non kotlin test in
sleuth.
I will be offline after tomorrow for holiday fyi
this feature can go into brave after tests etc but people can paste for now
as it is small code. we have people pasting hystrix for same reason.
|
Yeah, I'll paste the code to avoid the issue for now. 📝 Enjoy your holiday. 😉 |
FYI We are already using other withContext(context) {
foo()
} To enable brave with withContext(context + TracingContextElement(Tracing.current().currentTraceContext())) {
foo()
} This perfectly works. Thanks guys! |
@shiraji question: do you need to reference like |
We don't have handy reference to the context object. |
Is there any update on this/a preferred solution? |
Last I heard.. #820 (comment) works for @shiraji and unknown for @ackintosh as it is not clear on the latter whether instrumentation was of concern or the context impl. @TobiasBales you can try to be a tie breaker? paste the code and try? It isn't much code... anyway next step after there's a clear yes/no on this impl would be to add a directory for context-kotlin with tests |
One problem with the above solution is the following:
When the coroutine resumes, rather than applying the context of the scoped span that was created in (2) it will apply the original context. The following appears to pass my unit tests: typealias TraceContextHolder = CurrentTraceContext
typealias TraceContextState = TraceContext?
class TracingContextElement(
private val traceContextHolder: TraceContextHolder = Tracing.current().currentTraceContext(),
private var context: TraceContextState = traceContextHolder.get()
) : ThreadContextElement<Scope>, AbstractCoroutineContextElement(Key) {
companion object Key : CoroutineContext.Key<BraveTracingContextElement>
override fun updateThreadContext(context: CoroutineContext): Scope {
return traceContextHolder.maybeScope(this.context)
}
override fun restoreThreadContext(context: CoroutineContext, oldState: Scope) {
// check to see if, during coroutine execution, the context was updated
traceContextHolder.get()?.let {
if (it != this.context) {
this.context = it
}
}
oldState.close()
}
} |
I don't know if this applies to Kotlin, but just in case - in Armeria we use a custom implementation of I would imagine something similar could happen for Kotlin, where a |
My guess is we need to focus on this again, as people have a lot more experience with kotlin since when this was first opened. Also docs seem good on integration options. https://kotlinlang.org/docs/reference/coroutines/coroutine-context-and-dispatchers.html#thread-local-data |
I played around with this and pinged on gitter.im triggering the post @adriancole did above. After trying the code above here, I found that I did not really need it to propagate traces correctly. However I needed to add MDCContext to propagate my own MDC field User correctly. However when I log as part of an reactor chain. That is in a WebClient call before I return await it will not log out my MDC context but it will lot out the correct traces and spans from brave. My code is here: https://github.com/bjartek/webflux-playground In order to test start up the application and run Note that I use a custom logback.xml so my log output is not the same as the above mentions. An example with the standard logback format is: (irrelevant lines removed)
|
For the record I just tried to remove spring-cloud-sleuth from my example and the pattern is still the same for me then.
|
My example https://github.com/bjartek/webflux-playground now works the way I want it. The comment above is resolved in the latest commit. |
@bjartek thanks again for keeping us posted with your research! |
Hi @adriancole : ) . It works perfectly for me when I use the However it doesn't work correctly when I use Anyway, thanks a lot for the snippet. If you want me to do some tests, do not hesitate. Any chance this would be merged into the main code base soon? |
@wlezzar if you are up to it, make a pull request for a new module in the context directory. There will be some other tasks, like a polished README etc, but I'm sure everyone here will ❤️ you for it! |
I would love to contribute. I will try to find the time for it : ) |
I tried the shown ServerInterceptor:
The trace and the span are empty in log statements in the gRPC service. Nothing changes if I replace |
Things suggested by @codefromthecrypt comment worked for me. @asarkar not sure what you try to do with Grpc. I use LogNet/grpc-spring-boot-starter impl. For the server extending grpc-kotlin coroutines, I register a bean using a @Bean
@GRpcGlobalInterceptor
fun coroutineTraceInterceptor(): ServerInterceptor {
return object : CoroutineContextServerInterceptor() {
override fun coroutineContext(
call: ServerCall<*, *>,
headers: Metadata,
): CoroutineContext {
return TracingContextElement(Tracing.current().currentTraceContext())
}
}
} For the client, I use the |
@jiraguha Happy to hear that it worked for you, although that probably means you've a use case that doesn't do one or all of the following:
Unless all of the above cases work in tandem, it can't be said official support for Kotlin gRPC has arrived. |
@asarkar, Your supposition is correct, I decided to use coroutines within a narrow scope and interface it with project-reactor by default. I favor the simplicity of a functional code.
mono(TracingContextElement(Tracing.current().currentTraceContext())){
// calling my grpc coroutine client
} With this and my sniped above, the trace propagates correctly...
Not sure that all those points are related to this current issue only. Maybe you can create a small project with all problems, not sure I will have the answer to your problems but it will certainly be interesting. |
@jiraguha openzipkin/zipkin-support#40 (comment) said I interrupted someone's breakfast by reporting the issue. Since the beauty of OSS is there are always more than one players in the game, I decided to spend my time working with OpenTelemetry, and have no interest in creating a project to try out Brave. I responded to your comment because you mentioned me as if you've solved the issue I had reported, where you didn't at all since your use case is very different and most likely simpler. |
For anybody who is using Spring Boot / Micrometer Tracing (with a bridge to Brave) the topic of traceId propagation in coroutines is also being discussed here: micrometer-metrics/tracing#174 Micrometer basically provides an implementation of |
At LINE, Xu JunJian noted that it may be tricky to propagate async contexts through co-routines. We should investigate this.
https://kotlinlang.org/docs/reference/coroutines-overview.html
The text was updated successfully, but these errors were encountered: