Skip to content

Commit

Permalink
[chore][receiver/loki] follow receiver contract (#35327)
Browse files Browse the repository at this point in the history
**Description:**  
Follow receiver contract for `loki`.
This also includes an internal errorutil package which will be used by
other network receivers as well.

**Link to tracking Issue:**
#5909

**Testing:** Added
  • Loading branch information
VihasMakwana authored Oct 18, 2024
1 parent 1fab9bb commit e6936f2
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 2 deletions.
25 changes: 25 additions & 0 deletions internal/coreinternal/errorutil/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package errorutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/errorutil"

import (
"go.opentelemetry.io/collector/consumer/consumererror"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func GrpcError(err error) error {
s, ok := status.FromError(err)
if !ok {
// Default to a retryable error
// https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md#failures
code := codes.Unavailable
if consumererror.IsPermanent(err) {
// non-retryable error
code = codes.Unknown
}
s = status.New(code, err.Error())
}
return s.Err()
}
2 changes: 1 addition & 1 deletion internal/coreinternal/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
golang.org/x/text v0.19.0
google.golang.org/grpc v1.67.1
)

require (
Expand Down Expand Up @@ -87,7 +88,6 @@ require (
golang.org/x/sys v0.25.0 // indirect
golang.org/x/tools v0.23.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect
google.golang.org/grpc v1.67.1 // indirect
google.golang.org/protobuf v1.35.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Expand Down
2 changes: 1 addition & 1 deletion receiver/lokireceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/grafana/loki/pkg/push v0.0.0-20240514112848-a1b1eeb09583
github.com/json-iterator/go v1.1.12
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.111.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.111.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.111.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.111.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.111.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/loki v0.111.0
Expand Down
8 changes: 8 additions & 0 deletions receiver/lokireceiver/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/errorutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/loki"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/lokireceiver/internal"
)
Expand Down Expand Up @@ -163,6 +164,9 @@ func (r *lokiReceiver) Push(ctx context.Context, pushRequest *push.PushRequest)
logRecordCount := logs.LogRecordCount()
err = r.nextConsumer.ConsumeLogs(ctx, logs)
r.obsrepGRPC.EndLogsOp(ctx, "protobuf", logRecordCount, err)
if err != nil {
return &push.PushResponse{}, errorutil.GrpcError(err)
}
return &push.PushResponse{}, nil
}

Expand Down Expand Up @@ -219,6 +223,10 @@ func handleLogs(resp http.ResponseWriter, req *http.Request, r *lokiReceiver) {
logRecordCount := logs.LogRecordCount()
err = r.nextConsumer.ConsumeLogs(ctx, logs)
r.obsrepHTTP.EndLogsOp(ctx, "json", logRecordCount, err)
if err != nil {
errorutil.HTTPError(resp, err)
return
}

resp.WriteHeader(http.StatusNoContent)
}
76 changes: 76 additions & 0 deletions receiver/lokireceiver/loki_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"compress/gzip"
"compress/zlib"
"context"
"errors"
"fmt"
"net"
"net/http"
Expand All @@ -23,6 +24,7 @@ import (
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
Expand Down Expand Up @@ -362,6 +364,80 @@ func TestSendingPushRequestToGRPCEndpoint(t *testing.T) {
}
}

func TestExpectedStatus(t *testing.T) {

testcases := []struct {
name string
err error
expectedGrpcError string
expectedHTTPError string
}{
{
name: "permanent-error",
err: consumererror.NewPermanent(errors.New("permanent")),
expectedGrpcError: "rpc error: code = Unknown desc = Permanent error: permanent",
expectedHTTPError: "failed to upload logs; HTTP status code: 400",
},
{
name: "non-permanent-error",
err: errors.New("non-permanent"),
expectedGrpcError: "rpc error: code = Unavailable desc = non-permanent",
expectedHTTPError: "failed to upload logs; HTTP status code: 503",
},
}
for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
httpAddr := testutil.GetAvailableLocalAddress(t)
config := &Config{
Protocols: Protocols{
GRPC: &configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: testutil.GetAvailableLocalAddress(t),
Transport: confignet.TransportTypeTCP,
},
},
HTTP: &confighttp.ServerConfig{
Endpoint: httpAddr,
},
},
KeepTimestamp: true,
}

consumer := consumertest.NewErr(tt.err)
lr, err := newLokiReceiver(config, consumer, receivertest.NewNopSettings())
require.NoError(t, err)

require.NoError(t, lr.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() { require.NoError(t, lr.Shutdown(context.Background())) })
conn, err := grpc.NewClient(config.GRPC.NetAddr.Endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
defer conn.Close()
grpcClient := push.NewPusherClient(conn)

body := &push.PushRequest{
Streams: []push.Stream{
{
Labels: "{foo=\"bar\"}",
Entries: []push.Entry{
{
Timestamp: time.Unix(0, 1676888496000000000),
Line: "logline 1",
},
},
},
},
}

_, err = grpcClient.Push(context.Background(), body)
require.EqualError(t, err, tt.expectedGrpcError)

_, port, _ := net.SplitHostPort(httpAddr)
collectorAddr := fmt.Sprintf("http://localhost:%s/loki/api/v1/push", port)
require.EqualError(t, sendToCollector(collectorAddr, "application/json", "", []byte(`{"streams": [{"stream": {"foo": "bar"},"values": [[ "1676888496000000000", "logline 1" ]]}]}`)), tt.expectedHTTPError)
})
}
}

type Log struct {
Timestamp int64
Body pcommon.Value
Expand Down

0 comments on commit e6936f2

Please sign in to comment.