Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ERROR proto: illegal wireType 7 while unmarshalling metrics request #4700

Closed
cuprumtan opened this issue Dec 13, 2023 · 1 comment
Closed
Labels
area: instrumentation Related to an instrumentation package bug Something isn't working instrumentation: otelhttp
Milestone

Comments

@cuprumtan
Copy link

cuprumtan commented Dec 13, 2023

Description

I am working with given client/server otelhttp example. Instead of a client I use an opentelemetry-collector with its native otlphttpexporter exporter and PostgreSQL receiver from contrib. To read metrics I added new logic to the server (mostly copypasted from native otlphttpreceiver) - see step 2 in Steps To Reproduce.

But when performing encoder.unmarshalMetricsRequest(body) I get the error proto: illegal wireType 7.

Environment

  • OS: Ubuntu 22.04.2 LTS
  • Architecture: x86
  • Go Version: 1.21.5
  • otelhttp version: v0.46.1
  • ocb v0.89.0
  • otlphttpexporter v0.89.0
  • opentelemetry-collector-contrib/receiver/postgresqlreceiver version: v0.88.0

Steps To Reproduce

  1. Build and run opentelemetry-collector locally using this configs:
builder.yaml
dist:
  version: "v0.0.1"
  module: otel-collector
  name: otel-collector-linux-amd64
  description: "Test metrics collector based on OpenTelemetry"
  otelcol_version: 0.89.0
  output_path: ./build/pgpro-otel-collector

receivers:
  - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/postgresqlreceiver v0.88.0

exporters:
  - gomod: go.opentelemetry.io/collector v0.89.0
    import: go.opentelemetry.io/collector/exporter/otlphttpexporter
collector-config.yaml
receivers:
  postgresql:
    endpoint: localhost:5432
    transport: tcp
    username: postgres
    password: postgres
    databases:
      - postgres
    collection_interval: 10s
    tls:
      insecure: false

exporters:
  otlphttp:
    metrics_endpoint: http://127.0.0.1:7777/hello

service:
  telemetry:
    logs:
      level: debug
      development: false
      encoding: json
    metrics:
      level: detailed
  pipelines:
    metrics:
      receivers:
        - postgresql
      exporters:
        - otlphttp
  1. Run locally this code:
server.go
package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"net/http"

	"github.com/gogo/protobuf/proto"
	"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
	"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/baggage"
	"go.opentelemetry.io/otel/exporters/stdout/stdoutmetric"
	sdkmetric "go.opentelemetry.io/otel/sdk/metric"
	"go.opentelemetry.io/otel/trace"
	spb "google.golang.org/genproto/googleapis/rpc/status"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

var fallbackMsg = []byte(`{"code": 13, "message": "failed to marshal error message"}`)

const fallbackContentType = "application/json"
const pbContentType = "application/x-protobuf"

func initMeter() (*sdkmetric.MeterProvider, error) {
	exp, err := stdoutmetric.New()
	if err != nil {
		return nil, err
	}

	mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(sdkmetric.NewPeriodicReader(exp)))
	otel.SetMeterProvider(mp)
	return mp, nil
}

type protoEncoder struct{}

func (protoEncoder) unmarshalMetricsRequest(buf []byte) (pmetricotlp.ExportRequest, error) {
	req := pmetricotlp.NewExportRequest()
	err := req.UnmarshalProto(buf)
	fmt.Printf("err %+v\n", err)
	return req, err
}

func (protoEncoder) marshalMetricsResponse(resp pmetricotlp.ExportResponse) ([]byte, error) {
	return resp.MarshalProto()
}

func (protoEncoder) marshalStatus(resp *spb.Status) ([]byte, error) {
	return proto.Marshal(resp)
}

func (protoEncoder) contentType() string {
	return pbContentType
}
func readAndCloseBody(resp http.ResponseWriter, req *http.Request, encoder protoEncoder) ([]byte, bool) {
	body, err := io.ReadAll(req.Body)
	if err != nil {
		writeError(resp, encoder, err, http.StatusBadRequest)
		return nil, false
	}
	if err = req.Body.Close(); err != nil {
		writeError(resp, encoder, err, http.StatusBadRequest)
		return nil, false
	}
	return body, true
}

func writeError(w http.ResponseWriter, encoder protoEncoder, err error, statusCode int) {
	s, ok := status.FromError(err)
	if !ok {
		s = errorMsgToStatus(err.Error(), statusCode)
	}
	writeStatusResponse(w, encoder, statusCode, s.Proto())
}

func errorMsgToStatus(errMsg string, statusCode int) *status.Status {
	if statusCode == http.StatusBadRequest {
		return status.New(codes.InvalidArgument, errMsg)
	}
	return status.New(codes.Unknown, errMsg)
}

func writeStatusResponse(w http.ResponseWriter, encoder protoEncoder, statusCode int, rsp *spb.Status) {
	msg, err := encoder.marshalStatus(rsp)
	if err != nil {
		writeResponse(w, fallbackContentType, http.StatusInternalServerError, fallbackMsg)
		return
	}

	writeResponse(w, encoder.contentType(), statusCode, msg)
}

func writeResponse(w http.ResponseWriter, contentType string, statusCode int, msg []byte) {
	w.Header().Set("Content-Type", contentType)
	w.WriteHeader(statusCode)
	// Nothing we can do with the error if we cannot write to the response.
	_, _ = w.Write(msg)
}

func main() {
	mp, err := initMeter()
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		if err := mp.Shutdown(context.Background()); err != nil {
			log.Printf("Error shutting down meter provider: %v", err)
		}
	}()

	uk := attribute.Key("username")

	helloHandler := func(w http.ResponseWriter, req *http.Request) {
		encoder := protoEncoder{}

		body, ok := readAndCloseBody(w, req, encoder)
		if !ok {
			return
		}

		otlpReq, err := encoder.unmarshalMetricsRequest(body)
		if err != nil {
			writeError(w, encoder, err, http.StatusBadRequest)
			return
		}

		ctx := req.Context()
		span := trace.SpanFromContext(ctx)
		bag := baggage.FromContext(ctx)
		span.AddEvent("handling this...", trace.WithAttributes(uk.String(bag.Member("username").Value())))

		_, _ = io.WriteString(w, "Hello, world!\n")
	}

	otelHandler := otelhttp.NewHandler(http.HandlerFunc(helloHandler), "Hello")

	http.Handle("/hello", otelHandler)
	err = http.ListenAndServe(":7777", nil) //nolint:gosec // Ignoring G114: Use of net/http serve function that has no support for setting timeouts.
	if err != nil {
		log.Fatal(err)
	}
}
  1. See error:
{"level":"error","ts":1702476049.2793086,"caller":"exporterhelper/retry_sender.go:126","msg":"Exporting failed. The error is not retryable. Dropping data.","kind":"exporter","data_type":"metrics","name":"otlphttp","error":"Permanent error: error exporting items, request to http://127.0.0.1:7777/hello responded with HTTP Status Code 400, Message=proto: illegal wireType 7, Details=[]","dropped_items":34,"stacktrace":"go.opentelemetry.io/collector/exporter/exporterhelper.(*retrySender).send\n\tgo.opentelemetry.io/collector/exporter@v0.91.0/exporterhelper/retry_sender.go:126\ngo.opentelemetry.io/collector/exporter/exporterhelper.(*metricsSenderWithObservability).send\n\tgo.opentelemetry.io/collector/exporter@v0.91.0/exporterhelper/metrics.go:170\ngo.opentelemetry.io/collector/exporter/exporterhelper.(*queueSender).consume\n\tgo.opentelemetry.io/collector/exporter@v0.91.0/exporterhelper/queue_sender.go:115\ngo.opentelemetry.io/collector/exporter/exporterhelper/internal.(*boundedMemoryQueue[...]).Consume\n\tgo.opentelemetry.io/collector/exporter@v0.91.0/exporterhelper/internal/bounded_memory_queue.go:55\ngo.opentelemetry.io/collector/exporter/exporterhelper/internal.(*QueueConsumers[...]).Start.func1\n\tgo.opentelemetry.io/collector/exporter@v0.91.0/exporterhelper/internal/consumers.go:43"}

Expected behavior

Receive metrics on the server side and unmarshall them correctly.

@cuprumtan cuprumtan added area: instrumentation Related to an instrumentation package bug Something isn't working instrumentation: otelhttp labels Dec 13, 2023
@cuprumtan
Copy link
Author

UPD: I was looking for similar problems and found this one. Based on the information from the issue, I found this workaround: I disabled opentelemetry-collector compression.

exporters:
  otlphttp:
    metrics_endpoint: http://127.0.0.1:7777/hello
    compression: ""

Now unmarshalling is proceeding correctly.

@pellared pellared added this to the untracked milestone Nov 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area: instrumentation Related to an instrumentation package bug Something isn't working instrumentation: otelhttp
Projects
None yet
Development

No branches or pull requests

2 participants