Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for @KafkaListener annotations #1001

Closed
danielsouza85 opened this issue Jun 5, 2018 · 18 comments
Closed

Add support for @KafkaListener annotations #1001

danielsouza85 opened this issue Jun 5, 2018 · 18 comments

Comments

@danielsouza85
Copy link

danielsouza85 commented Jun 5, 2018

Hello, I´m new with kafka and sleuth. And I´m having some problems on tracing a producer and a consumer over kafka.

I have one microservice that put some messages into a kafka topic, and the same microservice has a consumer, listening the same topic.

The first part is working just fine, but when the listener consume the message, the log shows no traceID, spanID. Everithing is empty.

I´ve enabled the debug for sleuth and at the producer part, I can see lots of debug messages, but at the consumer part, the only thing that was show is the log, without any trace information.

Here is the consumer code:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;


@Service
public class Receiver {
    private static final Logger LOG = LoggerFactory.getLogger(Receiver.class);
    
    @Value("${app.topic.topic02}")
    private String topic;

    
    @KafkaListener(topics = "${app.topic.topic02}")
    public void listen(@Payload String message,@Headers MessageHeaders messageHeaders) {
        //System.out.println("received message = " + message + " to topic = " + topic);
    	LOG.info("received message = " + message + " to topic = " + topic);
    }

}

And the Logs with debug enabled:

----> Producer Part

2018-06-05 17:13:22.561 DEBUG [apollo11-referencemicroservice,,,] 15046 --- [tp2067180044-14] o.s.c.sleuth.instrument.web.TraceFilter  : Received a request to uri [/sendMessageToKafka] that should not be sampled [false]
2018-06-05 17:13:22.562 DEBUG [apollo11-referencemicroservice,3071bc3b73453c71,3071bc3b73453c71,true] 15046 --- [tp2067180044-14] o.s.c.sleuth.instrument.web.TraceFilter  : No parent span present - creating a new span
2018-06-05 17:13:22.564 DEBUG [apollo11-referencemicroservice,3071bc3b73453c71,3071bc3b73453c71,true] 15046 --- [tp2067180044-14] o.s.c.s.i.web.TraceHandlerInterceptor    : Handling span [Trace: 3071bc3b73453c71, Span: 3071bc3b73453c71, Parent: null, exportable:true]
2018-06-05 17:13:22.564 DEBUG [apollo11-referencemicroservice,3071bc3b73453c71,3071bc3b73453c71,true] 15046 --- [tp2067180044-14] o.s.c.s.i.web.TraceHandlerInterceptor    : Adding a method tag with value [sendMessageToKafka] to a span [Trace: 3071bc3b73453c71, Span: 3071bc3b73453c71, Parent: null, exportable:true]
2018-06-05 17:13:22.564 DEBUG [apollo11-referencemicroservice,3071bc3b73453c71,3071bc3b73453c71,true] 15046 --- [tp2067180044-14] o.s.c.s.i.web.TraceHandlerInterceptor    : Adding a class tag with value [Application] to a span [Trace: 3071bc3b73453c71, Span: 3071bc3b73453c71, Parent: null, exportable:true]
2018-06-05 17:13:22.567 DEBUG [apollo11-referencemicroservice,3071bc3b73453c71,3071bc3b73453c71,true] 15046 --- [-KafkaService-2] ConcurrencyStrategy$HystrixTraceCallable : Continuing span [Trace: 3071bc3b73453c71, Span: 3071bc3b73453c71, Parent: null, exportable:true]
2018-06-05 17:13:22.568  INFO [apollo11-referencemicroservice,3071bc3b73453c71,3071bc3b73453c71,true] 15046 --- [-KafkaService-2] b.com.b3.apollo11.kafka.producer.Sender  : Trying to send message to Kafka:
{"prazo":"LONGO","objetivo":"APTO","perfilInvestidor":"SABER_VALOR_FINAL","formaRemuneracao":"FINAL_INVESTIMENRO","uuid":"123e4567-e89b-12d3-a456-426655440000"}
2018-06-05 17:13:22.568  INFO [apollo11-referencemicroservice,3071bc3b73453c71,3071bc3b73453c71,true] 15046 --- [-KafkaService-2] b.com.b3.apollo11.kafka.producer.Sender  : sending message = {"prazo":"LONGO","objetivo":"APTO","perfilInvestidor":"SABER_VALOR_FINAL","formaRemuneracao":"FINAL_INVESTIMENRO","uuid":"123e4567-e89b-12d3-a456-426655440000"} to topic = operationA
2018-06-05 17:13:22.569 DEBUG [apollo11-referencemicroservice,3071bc3b73453c71,3071bc3b73453c71,true] 15046 --- [-KafkaService-2] ConcurrencyStrategy$HystrixTraceCallable : Detaching span since it was continued [Trace: 3071bc3b73453c71, Span: 3071bc3b73453c71, Parent: null, exportable:true]
2018-06-05 17:13:22.571 DEBUG [apollo11-referencemicroservice,3071bc3b73453c71,3071bc3b73453c71,true] 15046 --- [tp2067180044-14] o.s.c.sleuth.instrument.web.TraceFilter  : Closing the span [Trace: 3071bc3b73453c71, Span: 3071bc3b73453c71, Parent: null, exportable:true] since the response was successful
2018-06-05 17:13:22.572 DEBUG [apollo11-referencemicroservice,3071bc3b73453c71,3071bc3b73453c71,true] 15046 --- [tp2067180044-14] o.s.c.s.zipkin2.DefaultEndpointLocator   : Span will contain serviceName [apollo11-referencemicroservice]

----> Consumer Part

2018-06-05 17:13:22.575  INFO **[apollo11-referencemicroservice,,,]** 15046 --- [ntainer#0-0-C-1] b.c.b3.apollo11.kafka.consumer.Receiver  : received message = {"prazo":"LONGO","objetivo":"APTO","perfilInvestidor":"SABER_VALOR_FINAL","formaRemuneracao":"FINAL_INVESTIMENRO","uuid":"123e4567-e89b-12d3-a456-426655440000"} to topic = operationA
@marcingrzejszczak
Copy link
Contributor

I don't recall supporting @KafkaListener. If you check out what Brave offers - https://github.com/openzipkin/brave/tree/master/instrumentation/kafka-clients it works fine when you manually create the listening component. If you use Spring Integration or Spring Cloud Stream, we will automatically continue the trace. I will leave this issue as an enhancement for future releases of Sleuth to add automated support for @KafkaListener

@danielsouza85
Copy link
Author

So, I have to implement my own consumer using Brave, that´s right?
As I sad, I´m new with kafka end spring.
Do you have a for dummyes example on how to do that?

@marcingrzejszczak
Copy link
Contributor

The easiest approach is to use Spring Cloud Stream https://cloud.spring.io/spring-cloud-stream/

@SpringBootApplication
@EnableBinding(Sink.class)
public class LoggingConsumerApplication {

	public static void main(String[] args) {
		SpringApplication.run(LoggingConsumerApplication.class, args);
	}

	@StreamListener(Sink.INPUT)
	public void handle(Person person) {
		System.out.println("Received: " + person);
	}

	public static class Person {
		private String name;
		public String getName() {
			return name;
		}
		public void setName(String name) {
			this.name = name;
		}
		public String toString() {
			return this.name;
		}
	}
}

add the spring-cloud-stream-kafka-binder dependency, point it to the proper destination and that's it. There's plenty of samples here https://github.com/spring-cloud/spring-cloud-stream-samples and in the internet.

@marcingrzejszczak marcingrzejszczak changed the title Sleuth does not propagate span over kafka Add support for @KafkaListener annotations Jun 5, 2018
@danielsouza85
Copy link
Author

Thank you for the help.
Do you have a simple example on how to use the KafkaTracing for a producer and consumer?
I´m searshing for something on the internet, but didn´t find anything that I can understand.

@danielsouza85
Copy link
Author

One more question.
At this part:

kafkaTracing = KafkaTracing.newBuilder(tracing)
.remoteServiceName("my-broker")
.build();

What is this tracing variable? I´m current using the sleuth to control the tracing ID and Span ID.
I have to implement My own trace or is there a way to get the trace that is being used?

@marcingrzejszczak
Copy link
Contributor

We're already instrumenting that for you. You don't have to create this bean.

We describe in the documentation how we use Brave (https://cloud.spring.io/spring-cloud-static/Finchley.RC2/single/spring-cloud.html#_introduction_to_brave). You can read about what Tracing is in the Brave's javadocs https://github.com/openzipkin/brave/blob/master/brave/src/main/java/brave/Tracing.java#L27 . It's the core bean that contains references to all the other tracing components.

My own trace or is there a way to get the trace that is being used?

Please read the documentation. If you check the latest docs and you search for what you've asked you will find this section https://cloud.spring.io/spring-cloud-sleuth/single/spring-cloud-sleuth.html#_current_span

@danielsouza85
Copy link
Author

For example:

I have a code like this:

KafkaProducer<String,String> kafkaProducer;
kafkaProducer.send(new ProducerRecord <>(topic, partitions[0], null, message))

If I want tos end trace information through the kafka do a consumer, Is this what I have to do?

KafkaTracing kafkaTracing = KafkaTracing.newBuilder(tracing).remoteServiceName("my-broker").build();
kafkaTracing.producer(kafkaProducer).send(new ProducerRecord <>(topic, partitions[0], null, message));

If that´s correct, how can I get the current traceing?

@marcingrzejszczak
Copy link
Contributor

No, that's completely incorrect.

I wrote to you

We're already instrumenting that for you. You don't have to create this bean.

And you have created the object yourself. Just use the kafkaProducer normally and that's it.

If that´s correct, how can I get the current traceing?

I don't understand this question. If you want the current tracer just autowire it. I think we describe this in the documentation? Can you please read it?

@danielsouza85
Copy link
Author

Sorry for asking so many questions, but I´m stucked at this for some time, and I´m still in the dark.

I read that these traces can only be propagated through kafka versions >0.11.0. Is that true?
I´m using version 0.10.1.0. This could be a problem?

@marcingrzejszczak
Copy link
Contributor

I have no idea what you're doing, but it might be somewhat related to #1005 . With Finchley, Spring Cloud Stream Kafka binder uses 0.11 and automatically propagates the header. From what I know in Kafka 0.10 there was no notion of headers as such cc @artembilan

@artembilan
Copy link
Member

That's correct. You need to upgrade to Apache Kafka >= 0.11. That's from where it has started to support headers. Spring Kafka will propagate them all for you then automatically. Therefore these tracing headers will go from the producer over Kafka to consumer transparently for you.

I'm afraid we can do nothing for you in that old Kafka version.

See more info in the Spring Kafka Reference Manual: https://docs.spring.io/spring-kafka/docs/2.1.6.RELEASE/reference/html/_reference.html#headers

@danielsouza85
Copy link
Author

@artembilan and @marcingrzejszczak, thanks for the informations.

So, I´ve my kafka updated to version 2.12-1.1.0. Now, it´s ready for headers.

As told by @marcingrzejszczak, I´ve changed my aplication to, I´ve rewrite the producer and the consumer, but I still can´t see the traceID beeng propagated through kafka.

At my pom, I have the following:

<parent>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-parent</artifactId>
	<version>1.5.12.RELEASE</version>
</parent>
<dependencyManagement>
	<dependencies>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-dependencies</artifactId>
			<version>Edgware.SR3</version>
			<type>pom</type>
			<scope>import</scope>
		</dependency>
	</dependencies>
</dependencyManagement>

<dependencies>
	<dependency>
		<groupId>org.springframework.cloud</groupId>
		<artifactId>spring-cloud-starter-zipkin</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.kafka</groupId>
		<artifactId>spring-kafka</artifactId>
		<version>1.3.5.RELEASE</version>
	</dependency>
	<dependency>
		<groupId>org.springframework.cloud</groupId>
		<artifactId>spring-cloud-starter-stream-kafka</artifactId>
	</dependency>
</dependencies>

The application.properties is:

#Configurações do microserviço
server.port=8080
spring.application.name = apollo11-referencemicroservice

#Configurações do serviço de persistência
spring.service.persistence.host=cldnp00604d.internalenv.corp
spring.service.persistence.port=8586

#Configurações da integração com o MongoDB
spring.data.mongodb.host=cldnp00604d.internalenv.corp
spring.data.mongodb.port=27017
spring.data.mongodb.database=contract
spring.data.mongodb.username=apollo11
spring.data.mongodb.password=apollo11pwd

#Configurações da integração com o Kafka
app.topic.topic01=zipkin
app.topic.topic02=operationA
app.consumer.partitions=0
app.sender.partitions=0

#Configurações para o Sleuth + Zipkin com kafka
spring.sleuth.sampler.percentage=1.0
spring.zipkin.sender.type=kafka
spring.cloud.stream.default.contentType=application/json
spring.cloud.stream.kafka.binder.brokers=HOST:9092
spring.cloud.stream.kafka.binder.zkNodes=HOST:2181
spring.cloud.stream.kafka.binder.headers=spanTraceId,spanId,spanParrentSpanId,spanProcessId

spring.cloud.stream.bindings.output.producer.headerMode=raw
spring.cloud.stream.bindings.input.consumer.headerMode=raw
spring.cloud.stream.bindings.output.destination=operationA
spring.cloud.stream.bindings.input.destination=operationA

The consumer code is like bellow:

@EnableBinding(Source.class)
@RestController
@SpringBootApplication
public class Application {

    private static final Logger LOG = LoggerFactory.getLogger(Application.class);
    
    @Bean
    public RestTemplate rest(RestTemplateBuilder builder) {
        return builder.build();
    }

    @RequestMapping(value="/sendKafkaComTrace", method = RequestMethod.POST)
    public void sendKafkaComTrace(@RequestBody String message){
    	LOG.info("-------------------Entrando no sendKafkaComTrace");
    	//send message to channel
    	Message<String> msg = MessageBuilder.withPayload(message).setHeader("TESTEHEADER", "TESTE").build();
	LOG.info("---------- Messagem a Enviar = {}" + msg.toString());
    	
        mysource.output().send(msg);
     
    }

    public static void main(String[] args) {
        SpringApplication.run (Application.class, args);
    }
}

And the consumer:

@EnableBinding(Sink.class)
@SpringBootApplication
public class Receiver {
    @Value("${app.topic.topic02}")
    private String topic;
    @Value("${app.consumer.partitions}")
    private String[] partitions;

    private static final Logger LOG = LoggerFactory.getLogger(Receiver.class);

	public static void main(String[] args) {
	    SpringApplication.run(Receiver.class, args);
	  }
	 
	  @StreamListener(target=Sink.INPUT)
	  public void logfast(GenericMessage msg) {
		  LOG.info("++++++++++ Header:" + msg.getHeaders().toString());
		  LOG.info("++++++++++ Received message = {}" + msg.toString());
	    System.out.println(msg);
	  }
}

Executing the code, I can see the following logs:

Producer:

2018-06-19 13:35:15.160  INFO [apollo11-referencemicroservice,5072403c0b9773cc,5072403c0b9773cc,true] 13436 --- [tp1830745997-17] br.com.b3.apollo11.Application           : -------------------Entrando no sendKafkaComTrace
2018-06-19 13:35:15.160  INFO [apollo11-referencemicroservice,5072403c0b9773cc,5072403c0b9773cc,true] 13436 --- [tp1830745997-17] br.com.b3.apollo11.Application           : ---------- Messagem a Enviar = {}GenericMessage [payload={"prazo":"LONGO","objetivo":"ESTUDO","perfilInvestidor":"SABER_VALOR_FINAL","formaRemuneracao":"FINAL_INVESTIMENRO","uuid":"123e4567-e89b-12d3-a456-426655440000"}, headers={TESTEHEADER=TESTE, id=1e3ae519-591f-35f2-00ed-aa0509b72d2d, timestamp=1529426115160}]

Consumer:

2018-06-19 13:35:15.170  INFO [apollo11-referencemicroservice,d5e9832e7ffaab3e,d5e9832e7ffaab3e,true] 13436 --- [ consumer-0-C-1] b.c.b.a.p.m.kafka.consumer.Receiver      : ++++++++++ Header:{kafka_timestampType=CREATE_TIME, messageSent=true, kafka_receivedMessageKey=null, kafka_receivedTopic=operationA, spanName=message:input, spanTraceId=d5e9832e7ffaab3e, spanId=d5e9832e7ffaab3e, nativeHeaders={spanTraceId=[d5e9832e7ffaab3e], spanId=[d5e9832e7ffaab3e], spanName=[message:input], spanSampled=[1]}, kafka_offset=196, id=d95da8cb-9fba-40fe-5088-049e5c766795, kafka_receivedPartitionId=0, spanSampled=1, kafka_receivedTimestamp=1529426115163, contentType=application/json}
2018-06-19 13:35:15.170  INFO [apollo11-referencemicroservice,d5e9832e7ffaab3e,d5e9832e7ffaab3e,true] 13436 --- [ consumer-0-C-1] b.c.b.a.p.m.kafka.consumer.Receiver      : ++++++++++ Received message = {}GenericMessage [payload=byte[162], headers={kafka_timestampType=CREATE_TIME, messageSent=true, kafka_receivedMessageKey=null, kafka_receivedTopic=operationA, spanName=message:input, spanTraceId=d5e9832e7ffaab3e, spanId=d5e9832e7ffaab3e, nativeHeaders={spanTraceId=[d5e9832e7ffaab3e], spanId=[d5e9832e7ffaab3e], spanName=[message:input], spanSampled=[1]}, kafka_offset=196, id=d95da8cb-9fba-40fe-5088-049e5c766795, kafka_receivedPartitionId=0, spanSampled=1, kafka_receivedTimestamp=1529426115163, contentType=application/json}]
GenericMessage [payload=byte[162], headers={kafka_timestampType=CREATE_TIME, messageSent=true, kafka_receivedMessageKey=null, kafka_receivedTopic=operationA, spanName=message:input, spanTraceId=d5e9832e7ffaab3e, spanId=d5e9832e7ffaab3e, nativeHeaders={spanTraceId=[d5e9832e7ffaab3e], spanId=[d5e9832e7ffaab3e], spanName=[message:input], spanSampled=[1]}, kafka_offset=196, id=d95da8cb-9fba-40fe-5088-049e5c766795, kafka_receivedPartitionId=0, spanSampled=1, kafka_receivedTimestamp=1529426115163, contentType=application/json}]

So, we can see that the producer and consumer TraceIDs are different. And at the consumer, I can´t see nothing at the headers, even the custom header i´ve inserted manually at the producer.
Is there any problems with the spring-kafka, spring-boot and spring-cloud versions I´m using?

@artembilan
Copy link
Member

Not sure what is your 2.12-1.1.0, but it doesn't look like you override anything about Kafka in your POM. Here is a procedure how to do that: https://docs.spring.io/spring-kafka/docs/2.1.6.RELEASE/reference/html/deps-for-11x.html

Pay attention that we rely there on the kafka_2.11. there is nothing about 2.12.

Also you need to be sure that your Kafka Broker is exactly newer version, but not < 0.11. There is not enough to upgrade dependencies, if the target broker is not in the proper version.

Also see Compatibility Matrix: https://spring.io/projects/spring-kafka#overview. The Spring Kafka 1.3.x is not compatible with Apache Kafka 1.1.x.

I know this is tough, but there is no choice with Kafka if you would like more features.

@marcingrzejszczak
Copy link
Contributor

I'm sorry but this is a very simple feature that has been working since day one. You have to simplify your example and post it on github after following the guidlines from @artembilan . E.g. your snippets don't even compile (where did you take mySource from?). We won't be able to help you otherwise.

@fsopjani
Copy link

fsopjani commented Jun 26, 2018

@danielsouza85 Remove your sensitive info in your application.properties, you've got host, passwords everything on the configuration properties exposed to public unnecessary.

@timtebeek
Copy link
Contributor

timtebeek commented Aug 17, 2018

Hi! Seems this issue went off-course with unrelated issues for a moment; coming back on topic I would also like to use plain @KafkaListener annotations whilst propagating trace/span data from my producers. This issue seems to indicate that's not (yet) possible, but in my sample project here the tests seem to indicate it works: https://github.com/timtebeek/sleuth-kafkalistener

Now I'm puzzled; can anyone either confirm this works, or point out the flaw in my example project? If there's documentation to be updated that would also be helpful, although I wouldn't really know what to add or where. Any help appreciated. :)

@marcingrzejszczak
Copy link
Contributor

It just works #1088

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants