Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot unmarshal Kafka record received #16078

Closed
RobertFloor opened this issue Nov 4, 2022 · 7 comments
Closed

Cannot unmarshal Kafka record received #16078

RobertFloor opened this issue Nov 4, 2022 · 7 comments
Labels
bug Something isn't working needs triage New item requiring triage receiver/kafka

Comments

@RobertFloor
Copy link

RobertFloor commented Nov 4, 2022

Component(s)

receiver/kafka

What happened?

We are attempting to read a Confluent Kafka Audit topic of our organization in the open telemetery collector. This contains audit events which we would like to ingest into the Open telemetry collector. We would like to read the topic and send it to Splunk. We can connect to confluent cloud , but are getting a format error. I have tried all suggested encoders. Somehow we cannot unmarshal the message correctly. Do you have an idear how to decode the messages from the Audit topic?

Collector version

0.63.1

Environment information

Environment

Docker container (https://hub.docker.com/layers/otel/opentelemetry-collector/latest/images/sha256-5d5669c53025a0a81d725e36e6c60e8ebf502ddc5259a3036ed8f5f8067f4713?context=explore)

OpenTelemetry Collector configuration

extensions:
  health_check:
  pprof:
    endpoint: 0.0.0.0:1777
  zpages:
    endpoint: 0.0.0.0:55679

receivers:
  otlp:
    protocols:
      grpc:
      http:

  kafka:
    brokers:
    - 'xxxxxxx.aws.confluent.cloud:9092'
    topic: 'confluent-audit-log-events'
    protocol_version: 2.6.0
    encoding: zipkin_raw
    auth: 
      sasl:
        username: "xxxxx"
        password: "xxxxxxxxxx"
        mechanism: PLAIN
      tls:
        insecure: false
        insecure_skip_verify: true

  opencensus:

  # Collect own metrics
  prometheus:
    config:
      scrape_configs:
      - job_name: 'otel-collector'
        scrape_interval: 10s
        static_configs:
        - targets: ['0.0.0.0:8888']

  jaeger:
    protocols:
      grpc:
      thrift_binary:
      thrift_compact:
      thrift_http:

  zipkin:

processors:
  batch:

exporters:
  logging:
    logLevel: debug

service:

  pipelines:

    traces:
      receivers: [otlp, opencensus, jaeger, zipkin, kafka]
      processors: [batch]
      exporters: [logging]

    metrics:
      receivers: [otlp, opencensus, prometheus]
      processors: [batch]
      exporters: [logging]

  extensions: [health_check, pprof, zpages]

Log output

otel-collector_1  | 2022-11-04T11:36:36.150Z    error   kafkareceiver@v0.63.0/kafka_receiver.go:419     failed to unmarshal message     {"kind": "receiver", "name": "kafka", "pipeline": "traces", "error": "proto: illegal wireType 6"}
otel-collector_1  | github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver.(*tracesConsumerGroupHandler).ConsumeClaim
otel-collector_1  |     github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver@v0.63.0/kafka_receiver.go:419
otel-collector_1  | github.com/Shopify/sarama.(*consumerGroupSession).consume
otel-collector_1  |     github.com/Shopify/sarama@v1.37.2/consumer_group.go:868
otel-collector_1  | github.com/Shopify/sarama.newConsumerGroupSession.func2
otel-collector_1  |     github.com/Shopify/sarama@v1.37.2/consumer_group.go:793
otel-collector_1  | 2022-11-04T11:36:36.161Z    error   kafkareceiver@v0.63.0/kafka_receiver.go:419     failed to unmarshal message     {"kind": "receiver", "name": "kafka", "pipeline": "traces", "error": "proto: illegal wireType 6"}
otel-collector_1  | github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver.(*tracesConsumerGroupHandler).ConsumeClaim
otel-collector_1  |     github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver@v0.63.0/kafka_receiver.go:419
otel-collector_1  | github.com/Shopify/sarama.(*consumerGroupSession).consume
otel-collector_1  |     github.com/Shopify/sarama@v1.37.2/consumer_group.go:868
otel-collector_1  | github.com/Shopify/sarama.newConsumerGroupSession.func2
otel-collector_1  |     github.com/Shopify/sarama@v1.37.2/consumer_group.go:793

Additional context

This is an example record from Confluent Kafka which we would like to recieve from a Kafka Topic:

See comment below

@RobertFloor RobertFloor added bug Something isn't working needs triage New item requiring triage labels Nov 4, 2022
@github-actions
Copy link
Contributor

github-actions bot commented Nov 4, 2022

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

@MovieStoreGuy
Copy link
Contributor

Hey @RobertFloor,

Just taking a look at this now,

We are attempting to read a Confluent Kafka Audit topic of our organization in the open telemetery collector. This contains audit events which we would like to ingest into the Open telemetry collector.

So the receiver receiver/kafka is intended to Open Telemetry events with a given set of encoding, I suspect these audit events don't match the expected encodings.

Is there any docs you could provide with regards to the audit logs just so I can understand what is needed?

@RobertFloor
Copy link
Author

RobertFloor commented Nov 7, 2022

Hi thanks for your reply, this URL provides an example entry:

{
    "id": "889bdcd9-a378-4bfe-8860-180ef8efd208",
    "source": "crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q",
    "specversion": "1.0",
    "type": "io.confluent.kafka.server/authorization",
    "time": "2019-10-24T16:15:48.355Z",
    "datacontenttype": "application/json",
    "subject": "crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic",
    "confluentRouting": {
        "route": "confluent-audit-log-events"
    },
    "data": {
        "serviceName": "crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q",
        "methodName": "kafka.CreateTopics",
        "resourceName": "crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic",
        "authenticationInfo": {
            "principal": "User:resourceOwner1"
        },
        "authorizationInfo": {
            "granted": true,
            "operation": "Create",
            "resourceType": "Topic",
            "resourceName": "app3-topic",
            "patternType": "LITERAL",
            "rbacAuthorization": {
                "role": "ResourceOwner",
                "scope": {
                    "outerScope": [],
                    "clusters": {
                        "kafka-cluster": "j94C72q3Qpym0MJ9McEufQ"
                    }
                }
            }
        }
    }
}

However, the real Kafka message we see when reading from the topic is as follows:

% Headers: [content-type="application/cloudevents+json; charset=UTF-8"]
{
  "datacontenttype": "application/json",
  "data": {
    "serviceName": "crn://confluent.cloud/",
    "methodName": "mds.Authorize",
    "resourceName": "crn://confluent.cloud/organization=xxxxxx/environment=env-xxxxx/cloud-cluster=lkc-xxxxx/kafka=lkc-xxxxx",
    "authenticationInfo": {
      "principal": "User:sa-xxxx"
    },
    "authorizationInfo": {
      "granted": true,
      "operation": "AccessWithToken",
      "resourceType": "Cluster",
      "resourceName": "kafka-cluster",
      "patternType": "LITERAL",
      "rbacAuthorization": {
        "role": "EnvironmentMetricsViewer",
        "scope": {
          "outerScope": [
            "organization=xxxxxxx",
            "environment=env-xxxxx"
          ]
        }
      }
    },
    "request": {
      "correlation_id": "-1"
    },
    "requestMetadata": {}
  },
  "subject": "crn://confluent.cloud/organization=xxxxxx/environment=env-xxx/cloud-cluster=lkc-xxx/kafka=lkc-xxxx",
  "specversion": "1.0",
  "id": "xxxxxxxxx",
  "source": "crn://confluent.cloud/",
  "time": "2022-10-25T22:30:23.939Z",
  "type": "io.confluent.kafka.server/authorization"
}

@pavolloffay
Copy link
Member

The message does not seem to be a supported type that the collector can read.

@RobertFloor this is not a bug, but rather a feature request.

@RobertFloor
Copy link
Author

Thank you for you help, can I find he supported message type somewhere? Maybe we can change the message ?

@RobertFloor
Copy link
Author

Thanks :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working needs triage New item requiring triage receiver/kafka
Projects
None yet
Development

No branches or pull requests

3 participants