From e6936f2736b8df672545ee9278f1c61517a911e4 Mon Sep 17 00:00:00 2001 From: VihasMakwana <121151420+VihasMakwana@users.noreply.github.com> Date: Fri, 18 Oct 2024 15:03:41 +0530 Subject: [PATCH] [chore][receiver/loki] follow receiver contract (#35327) **Description:** Follow receiver contract for `loki`. This also includes an internal errorutil package which will be used by other network receivers as well. **Link to tracking Issue:** https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/5909 **Testing:** Added --- internal/coreinternal/errorutil/grpc.go | 25 ++++++++ internal/coreinternal/go.mod | 2 +- receiver/lokireceiver/go.mod | 2 +- receiver/lokireceiver/loki.go | 8 +++ receiver/lokireceiver/loki_test.go | 76 +++++++++++++++++++++++++ 5 files changed, 111 insertions(+), 2 deletions(-) create mode 100644 internal/coreinternal/errorutil/grpc.go diff --git a/internal/coreinternal/errorutil/grpc.go b/internal/coreinternal/errorutil/grpc.go new file mode 100644 index 000000000000..08b75990f0fc --- /dev/null +++ b/internal/coreinternal/errorutil/grpc.go @@ -0,0 +1,25 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package errorutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/errorutil" + +import ( + "go.opentelemetry.io/collector/consumer/consumererror" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func GrpcError(err error) error { + s, ok := status.FromError(err) + if !ok { + // Default to a retryable error + // https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md#failures + code := codes.Unavailable + if consumererror.IsPermanent(err) { + // non-retryable error + code = codes.Unknown + } + s = status.New(code, err.Error()) + } + return s.Err() +} diff --git a/internal/coreinternal/go.mod b/internal/coreinternal/go.mod index ed4f490e9bcb..8675c13635fe 100644 --- a/internal/coreinternal/go.mod +++ b/internal/coreinternal/go.mod @@ -23,6 +23,7 @@ require ( go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 golang.org/x/text v0.19.0 + google.golang.org/grpc v1.67.1 ) require ( @@ -87,7 +88,6 @@ require ( golang.org/x/sys v0.25.0 // indirect golang.org/x/tools v0.23.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect - google.golang.org/grpc v1.67.1 // indirect google.golang.org/protobuf v1.35.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/receiver/lokireceiver/go.mod b/receiver/lokireceiver/go.mod index 1c5b291f8fc3..d5be9e59e979 100644 --- a/receiver/lokireceiver/go.mod +++ b/receiver/lokireceiver/go.mod @@ -9,7 +9,7 @@ require ( github.com/grafana/loki/pkg/push v0.0.0-20240514112848-a1b1eeb09583 github.com/json-iterator/go v1.1.12 github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.111.0 - github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.111.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.111.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.111.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.111.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/loki v0.111.0 diff --git a/receiver/lokireceiver/loki.go b/receiver/lokireceiver/loki.go index 1d4cc5c3f0c1..a994c0ae121e 100644 --- a/receiver/lokireceiver/loki.go +++ b/receiver/lokireceiver/loki.go @@ -21,6 +21,7 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/errorutil" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/loki" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/lokireceiver/internal" ) @@ -163,6 +164,9 @@ func (r *lokiReceiver) Push(ctx context.Context, pushRequest *push.PushRequest) logRecordCount := logs.LogRecordCount() err = r.nextConsumer.ConsumeLogs(ctx, logs) r.obsrepGRPC.EndLogsOp(ctx, "protobuf", logRecordCount, err) + if err != nil { + return &push.PushResponse{}, errorutil.GrpcError(err) + } return &push.PushResponse{}, nil } @@ -219,6 +223,10 @@ func handleLogs(resp http.ResponseWriter, req *http.Request, r *lokiReceiver) { logRecordCount := logs.LogRecordCount() err = r.nextConsumer.ConsumeLogs(ctx, logs) r.obsrepHTTP.EndLogsOp(ctx, "json", logRecordCount, err) + if err != nil { + errorutil.HTTPError(resp, err) + return + } resp.WriteHeader(http.StatusNoContent) } diff --git a/receiver/lokireceiver/loki_test.go b/receiver/lokireceiver/loki_test.go index bf208b780d39..1b17bdc11314 100644 --- a/receiver/lokireceiver/loki_test.go +++ b/receiver/lokireceiver/loki_test.go @@ -8,6 +8,7 @@ import ( "compress/gzip" "compress/zlib" "context" + "errors" "fmt" "net" "net/http" @@ -23,6 +24,7 @@ import ( "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/confignet" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" @@ -362,6 +364,80 @@ func TestSendingPushRequestToGRPCEndpoint(t *testing.T) { } } +func TestExpectedStatus(t *testing.T) { + + testcases := []struct { + name string + err error + expectedGrpcError string + expectedHTTPError string + }{ + { + name: "permanent-error", + err: consumererror.NewPermanent(errors.New("permanent")), + expectedGrpcError: "rpc error: code = Unknown desc = Permanent error: permanent", + expectedHTTPError: "failed to upload logs; HTTP status code: 400", + }, + { + name: "non-permanent-error", + err: errors.New("non-permanent"), + expectedGrpcError: "rpc error: code = Unavailable desc = non-permanent", + expectedHTTPError: "failed to upload logs; HTTP status code: 503", + }, + } + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + httpAddr := testutil.GetAvailableLocalAddress(t) + config := &Config{ + Protocols: Protocols{ + GRPC: &configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: testutil.GetAvailableLocalAddress(t), + Transport: confignet.TransportTypeTCP, + }, + }, + HTTP: &confighttp.ServerConfig{ + Endpoint: httpAddr, + }, + }, + KeepTimestamp: true, + } + + consumer := consumertest.NewErr(tt.err) + lr, err := newLokiReceiver(config, consumer, receivertest.NewNopSettings()) + require.NoError(t, err) + + require.NoError(t, lr.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { require.NoError(t, lr.Shutdown(context.Background())) }) + conn, err := grpc.NewClient(config.GRPC.NetAddr.Endpoint, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + defer conn.Close() + grpcClient := push.NewPusherClient(conn) + + body := &push.PushRequest{ + Streams: []push.Stream{ + { + Labels: "{foo=\"bar\"}", + Entries: []push.Entry{ + { + Timestamp: time.Unix(0, 1676888496000000000), + Line: "logline 1", + }, + }, + }, + }, + } + + _, err = grpcClient.Push(context.Background(), body) + require.EqualError(t, err, tt.expectedGrpcError) + + _, port, _ := net.SplitHostPort(httpAddr) + collectorAddr := fmt.Sprintf("http://localhost:%s/loki/api/v1/push", port) + require.EqualError(t, sendToCollector(collectorAddr, "application/json", "", []byte(`{"streams": [{"stream": {"foo": "bar"},"values": [[ "1676888496000000000", "logline 1" ]]}]}`)), tt.expectedHTTPError) + }) + } +} + type Log struct { Timestamp int64 Body pcommon.Value