Skip to content

Commit

Permalink
fix otlp_log receiver wrong use of trace measurement (#2117)
Browse files Browse the repository at this point in the history
  • Loading branch information
tensorchen authored Nov 11, 2020
1 parent add864b commit 7af0ce2
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 4 deletions.
45 changes: 43 additions & 2 deletions obsreport/obsreport_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ var (
receiverPrefix = ReceiverKey + nameSep
receiveTraceDataOperationSuffix = nameSep + "TraceDataReceived"
receiverMetricsOperationSuffix = nameSep + "MetricsReceived"
receiverLogsOperationSuffix = nameSep + "LogsReceived"

// Receiver metrics. Any count of data items below is in the original format
// that they were received, reasoning: reconciliation is easier if measurements
Expand All @@ -82,11 +83,11 @@ var (
stats.UnitDimensionless)
mReceiverAcceptedLogRecords = stats.Int64(
receiverPrefix+AcceptedLogRecordsKey,
"Number of log records successfully pushed into the pipeline.",
"Number of log records successfully pushed into the pipeline.",
stats.UnitDimensionless)
mReceiverRefusedLogRecords = stats.Int64(
receiverPrefix+RefusedLogRecordsKey,
"Number of log records that could not be pushed into the pipeline.",
"Number of log records that could not be pushed into the pipeline.",
stats.UnitDimensionless)
)

Expand Down Expand Up @@ -176,6 +177,40 @@ func EndTraceDataReceiveOp(
)
}

// StartLogsReceiveOp is called when a request is received from a client.
// The returned context should be used in other calls to the obsreport functions
// dealing with the same receive operation.
func StartLogsReceiveOp(
operationCtx context.Context,
receiver string,
transport string,
opt ...StartReceiveOption,
) context.Context {
return traceReceiveOp(
operationCtx,
receiver,
transport,
receiverLogsOperationSuffix,
opt...)
}

// EndLogsReceiveOp completes the receive operation that was started with
// StartLogsReceiveOp.
func EndLogsReceiveOp(
receiverCtx context.Context,
format string,
numReceivedLogRecords int,
err error,
) {
endReceiveOp(
receiverCtx,
format,
numReceivedLogRecords,
err,
configmodels.LogsDataType,
)
}

// StartMetricsReceiveOp is called when a request is received from a client.
// The returned context should be used in other calls to the obsreport functions
// dealing with the same receive operation.
Expand Down Expand Up @@ -289,6 +324,9 @@ func endReceiveOp(
case configmodels.MetricsDataType:
acceptedMeasure = mReceiverAcceptedMetricPoints
refusedMeasure = mReceiverRefusedMetricPoints
case configmodels.LogsDataType:
acceptedMeasure = mReceiverAcceptedLogRecords
refusedMeasure = mReceiverRefusedLogRecords
}

stats.Record(
Expand All @@ -307,6 +345,9 @@ func endReceiveOp(
case configmodels.MetricsDataType:
acceptedItemsKey = AcceptedMetricPointsKey
refusedItemsKey = RefusedMetricPointsKey
case configmodels.LogsDataType:
acceptedItemsKey = AcceptedLogRecordsKey
refusedItemsKey = RefusedLogRecordsKey
}

span.AddAttributes(
Expand Down
60 changes: 60 additions & 0 deletions obsreport/obsreport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,66 @@ func TestReceiveTraceDataOp(t *testing.T) {
obsreporttest.CheckReceiverTracesViews(t, receiver, transport, int64(acceptedSpans), int64(refusedSpans))
}

func TestReceiveLogsOp(t *testing.T) {
doneFn, err := obsreporttest.SetupRecordedMetricsTest()
require.NoError(t, err)
defer doneFn()

ss := &spanStore{}
trace.RegisterExporter(ss)
defer trace.UnregisterExporter(ss)

parentCtx, parentSpan := trace.StartSpan(context.Background(),
t.Name(), trace.WithSampler(trace.AlwaysSample()))
defer parentSpan.End()

receiverCtx := obsreport.ReceiverContext(parentCtx, receiver, transport)
params := []receiveTestParams{
{transport, errFake},
{"", nil},
}
rcvdLogRecords := []int{13, 42}
for i, param := range params {
ctx := obsreport.StartLogsReceiveOp(receiverCtx, receiver, param.transport)
assert.NotNil(t, ctx)

obsreport.EndLogsReceiveOp(
ctx,
format,
rcvdLogRecords[i],
param.err)
}

spans := ss.PullAllSpans()
require.Equal(t, len(params), len(spans))

var acceptedLogRecords, refusedLogRecords int
for i, span := range spans {
assert.Equal(t, "receiver/"+receiver+"/LogsReceived", span.Name)
switch params[i].err {
case nil:
acceptedLogRecords += rcvdLogRecords[i]
assert.Equal(t, int64(rcvdLogRecords[i]), span.Attributes[obsreport.AcceptedLogRecordsKey])
assert.Equal(t, int64(0), span.Attributes[obsreport.RefusedLogRecordsKey])
assert.Equal(t, trace.Status{Code: trace.StatusCodeOK}, span.Status)
case errFake:
refusedLogRecords += rcvdLogRecords[i]
assert.Equal(t, int64(0), span.Attributes[obsreport.AcceptedLogRecordsKey])
assert.Equal(t, int64(rcvdLogRecords[i]), span.Attributes[obsreport.RefusedLogRecordsKey])
assert.Equal(t, params[i].err.Error(), span.Status.Message)
default:
t.Fatalf("unexpected param: %v", params[i])
}
switch params[i].transport {
case "":
assert.NotContains(t, span.Attributes, obsreport.TransportKey)
default:
assert.Equal(t, params[i].transport, span.Attributes[obsreport.TransportKey])
}
}
obsreporttest.CheckReceiverLogsViews(t, receiver, transport, int64(acceptedLogRecords), int64(refusedLogRecords))
}

func TestReceiveMetricsOp(t *testing.T) {
doneFn, err := obsreporttest.SetupRecordedMetricsTest()
require.NoError(t, err)
Expand Down
8 changes: 8 additions & 0 deletions obsreport/obsreporttest/obsreporttest.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ func CheckReceiverTracesViews(t *testing.T, receiver, protocol string, acceptedS
CheckValueForView(t, receiverTags, droppedSpans, "receiver/refused_spans")
}

// CheckReceiverLogsViews checks that for the current exported values for logs receiver views match given values.
// When this function is called it is required to also call SetupRecordedMetricsTest as first thing.
func CheckReceiverLogsViews(t *testing.T, receiver, protocol string, acceptedLogRecords, droppedLogRecords int64) {
receiverTags := tagsForReceiverView(receiver, protocol)
CheckValueForView(t, receiverTags, acceptedLogRecords, "receiver/accepted_log_records")
CheckValueForView(t, receiverTags, droppedLogRecords, "receiver/refused_log_records")
}

// CheckReceiverMetricsViews checks that for the current exported values for metrics receiver views match given values.
// When this function is called it is required to also call SetupRecordedMetricsTest as first thing.
func CheckReceiverMetricsViews(t *testing.T, receiver, protocol string, acceptedMetricPoints, droppedMetricPoints int64) {
Expand Down
13 changes: 13 additions & 0 deletions obsreport/obsreporttest/obsreporttest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,19 @@ func TestCheckReceiverMetricsViews(t *testing.T) {
obsreporttest.CheckReceiverMetricsViews(t, receiver, transport, 7, 0)
}

func TestCheckReceiverLogsViews(t *testing.T) {
doneFn, err := obsreporttest.SetupRecordedMetricsTest()
require.NoError(t, err)
defer doneFn()

receiverCtx := obsreport.ReceiverContext(context.Background(), receiver, transport)
ctx := obsreport.StartLogsReceiveOp(receiverCtx, receiver, transport)
assert.NotNil(t, ctx)
obsreport.EndLogsReceiveOp(ctx, format, 7, nil)

obsreporttest.CheckReceiverLogsViews(t, receiver, transport, 7, 0)
}

func TestCheckExporterTracesViews(t *testing.T) {
doneFn, err := obsreporttest.SetupRecordedMetricsTest()
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions receiver/otlpreceiver/logs/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ func (r *Receiver) sendToNextConsumer(ctx context.Context, ld pdata.Logs) error
ctx = client.NewContext(ctx, c)
}

ctx = obsreport.StartTraceDataReceiveOp(ctx, r.instanceName, receiverTransport)
ctx = obsreport.StartLogsReceiveOp(ctx, r.instanceName, receiverTransport)
err := r.nextConsumer.ConsumeLogs(ctx, ld)
obsreport.EndTraceDataReceiveOp(ctx, dataFormatProtobuf, numSpans, err)
obsreport.EndLogsReceiveOp(ctx, dataFormatProtobuf, numSpans, err)

return err
}

0 comments on commit 7af0ce2

Please sign in to comment.