From 7af0ce2f5e602e550d6750bfc1800232f6e05c8b Mon Sep 17 00:00:00 2001 From: Chen Yixiao Date: Thu, 12 Nov 2020 00:17:18 +0800 Subject: [PATCH] fix otlp_log receiver wrong use of trace measurement (#2117) --- obsreport/obsreport_receiver.go | 45 +++++++++++++- obsreport/obsreport_test.go | 60 +++++++++++++++++++ obsreport/obsreporttest/obsreporttest.go | 8 +++ obsreport/obsreporttest/obsreporttest_test.go | 13 ++++ receiver/otlpreceiver/logs/otlp.go | 4 +- 5 files changed, 126 insertions(+), 4 deletions(-) diff --git a/obsreport/obsreport_receiver.go b/obsreport/obsreport_receiver.go index 5f2ca48bf766..33148a033725 100644 --- a/obsreport/obsreport_receiver.go +++ b/obsreport/obsreport_receiver.go @@ -58,6 +58,7 @@ var ( receiverPrefix = ReceiverKey + nameSep receiveTraceDataOperationSuffix = nameSep + "TraceDataReceived" receiverMetricsOperationSuffix = nameSep + "MetricsReceived" + receiverLogsOperationSuffix = nameSep + "LogsReceived" // Receiver metrics. Any count of data items below is in the original format // that they were received, reasoning: reconciliation is easier if measurements @@ -82,11 +83,11 @@ var ( stats.UnitDimensionless) mReceiverAcceptedLogRecords = stats.Int64( receiverPrefix+AcceptedLogRecordsKey, - "Number of log records successfully pushed into the pipeline.", + "Number of log records successfully pushed into the pipeline.", stats.UnitDimensionless) mReceiverRefusedLogRecords = stats.Int64( receiverPrefix+RefusedLogRecordsKey, - "Number of log records that could not be pushed into the pipeline.", + "Number of log records that could not be pushed into the pipeline.", stats.UnitDimensionless) ) @@ -176,6 +177,40 @@ func EndTraceDataReceiveOp( ) } +// StartLogsReceiveOp is called when a request is received from a client. +// The returned context should be used in other calls to the obsreport functions +// dealing with the same receive operation. +func StartLogsReceiveOp( + operationCtx context.Context, + receiver string, + transport string, + opt ...StartReceiveOption, +) context.Context { + return traceReceiveOp( + operationCtx, + receiver, + transport, + receiverLogsOperationSuffix, + opt...) +} + +// EndLogsReceiveOp completes the receive operation that was started with +// StartLogsReceiveOp. +func EndLogsReceiveOp( + receiverCtx context.Context, + format string, + numReceivedLogRecords int, + err error, +) { + endReceiveOp( + receiverCtx, + format, + numReceivedLogRecords, + err, + configmodels.LogsDataType, + ) +} + // StartMetricsReceiveOp is called when a request is received from a client. // The returned context should be used in other calls to the obsreport functions // dealing with the same receive operation. @@ -289,6 +324,9 @@ func endReceiveOp( case configmodels.MetricsDataType: acceptedMeasure = mReceiverAcceptedMetricPoints refusedMeasure = mReceiverRefusedMetricPoints + case configmodels.LogsDataType: + acceptedMeasure = mReceiverAcceptedLogRecords + refusedMeasure = mReceiverRefusedLogRecords } stats.Record( @@ -307,6 +345,9 @@ func endReceiveOp( case configmodels.MetricsDataType: acceptedItemsKey = AcceptedMetricPointsKey refusedItemsKey = RefusedMetricPointsKey + case configmodels.LogsDataType: + acceptedItemsKey = AcceptedLogRecordsKey + refusedItemsKey = RefusedLogRecordsKey } span.AddAttributes( diff --git a/obsreport/obsreport_test.go b/obsreport/obsreport_test.go index f38f0eebccc1..9a6043d02469 100644 --- a/obsreport/obsreport_test.go +++ b/obsreport/obsreport_test.go @@ -147,6 +147,66 @@ func TestReceiveTraceDataOp(t *testing.T) { obsreporttest.CheckReceiverTracesViews(t, receiver, transport, int64(acceptedSpans), int64(refusedSpans)) } +func TestReceiveLogsOp(t *testing.T) { + doneFn, err := obsreporttest.SetupRecordedMetricsTest() + require.NoError(t, err) + defer doneFn() + + ss := &spanStore{} + trace.RegisterExporter(ss) + defer trace.UnregisterExporter(ss) + + parentCtx, parentSpan := trace.StartSpan(context.Background(), + t.Name(), trace.WithSampler(trace.AlwaysSample())) + defer parentSpan.End() + + receiverCtx := obsreport.ReceiverContext(parentCtx, receiver, transport) + params := []receiveTestParams{ + {transport, errFake}, + {"", nil}, + } + rcvdLogRecords := []int{13, 42} + for i, param := range params { + ctx := obsreport.StartLogsReceiveOp(receiverCtx, receiver, param.transport) + assert.NotNil(t, ctx) + + obsreport.EndLogsReceiveOp( + ctx, + format, + rcvdLogRecords[i], + param.err) + } + + spans := ss.PullAllSpans() + require.Equal(t, len(params), len(spans)) + + var acceptedLogRecords, refusedLogRecords int + for i, span := range spans { + assert.Equal(t, "receiver/"+receiver+"/LogsReceived", span.Name) + switch params[i].err { + case nil: + acceptedLogRecords += rcvdLogRecords[i] + assert.Equal(t, int64(rcvdLogRecords[i]), span.Attributes[obsreport.AcceptedLogRecordsKey]) + assert.Equal(t, int64(0), span.Attributes[obsreport.RefusedLogRecordsKey]) + assert.Equal(t, trace.Status{Code: trace.StatusCodeOK}, span.Status) + case errFake: + refusedLogRecords += rcvdLogRecords[i] + assert.Equal(t, int64(0), span.Attributes[obsreport.AcceptedLogRecordsKey]) + assert.Equal(t, int64(rcvdLogRecords[i]), span.Attributes[obsreport.RefusedLogRecordsKey]) + assert.Equal(t, params[i].err.Error(), span.Status.Message) + default: + t.Fatalf("unexpected param: %v", params[i]) + } + switch params[i].transport { + case "": + assert.NotContains(t, span.Attributes, obsreport.TransportKey) + default: + assert.Equal(t, params[i].transport, span.Attributes[obsreport.TransportKey]) + } + } + obsreporttest.CheckReceiverLogsViews(t, receiver, transport, int64(acceptedLogRecords), int64(refusedLogRecords)) +} + func TestReceiveMetricsOp(t *testing.T) { doneFn, err := obsreporttest.SetupRecordedMetricsTest() require.NoError(t, err) diff --git a/obsreport/obsreporttest/obsreporttest.go b/obsreport/obsreporttest/obsreporttest.go index 83d641b8357c..b1e1a413ff63 100644 --- a/obsreport/obsreporttest/obsreporttest.go +++ b/obsreport/obsreporttest/obsreporttest.go @@ -114,6 +114,14 @@ func CheckReceiverTracesViews(t *testing.T, receiver, protocol string, acceptedS CheckValueForView(t, receiverTags, droppedSpans, "receiver/refused_spans") } +// CheckReceiverLogsViews checks that for the current exported values for logs receiver views match given values. +// When this function is called it is required to also call SetupRecordedMetricsTest as first thing. +func CheckReceiverLogsViews(t *testing.T, receiver, protocol string, acceptedLogRecords, droppedLogRecords int64) { + receiverTags := tagsForReceiverView(receiver, protocol) + CheckValueForView(t, receiverTags, acceptedLogRecords, "receiver/accepted_log_records") + CheckValueForView(t, receiverTags, droppedLogRecords, "receiver/refused_log_records") +} + // CheckReceiverMetricsViews checks that for the current exported values for metrics receiver views match given values. // When this function is called it is required to also call SetupRecordedMetricsTest as first thing. func CheckReceiverMetricsViews(t *testing.T, receiver, protocol string, acceptedMetricPoints, droppedMetricPoints int64) { diff --git a/obsreport/obsreporttest/obsreporttest_test.go b/obsreport/obsreporttest/obsreporttest_test.go index b2089ab84cae..dc9ae8db4b46 100644 --- a/obsreport/obsreporttest/obsreporttest_test.go +++ b/obsreport/obsreporttest/obsreporttest_test.go @@ -62,6 +62,19 @@ func TestCheckReceiverMetricsViews(t *testing.T) { obsreporttest.CheckReceiverMetricsViews(t, receiver, transport, 7, 0) } +func TestCheckReceiverLogsViews(t *testing.T) { + doneFn, err := obsreporttest.SetupRecordedMetricsTest() + require.NoError(t, err) + defer doneFn() + + receiverCtx := obsreport.ReceiverContext(context.Background(), receiver, transport) + ctx := obsreport.StartLogsReceiveOp(receiverCtx, receiver, transport) + assert.NotNil(t, ctx) + obsreport.EndLogsReceiveOp(ctx, format, 7, nil) + + obsreporttest.CheckReceiverLogsViews(t, receiver, transport, 7, 0) +} + func TestCheckExporterTracesViews(t *testing.T) { doneFn, err := obsreporttest.SetupRecordedMetricsTest() require.NoError(t, err) diff --git a/receiver/otlpreceiver/logs/otlp.go b/receiver/otlpreceiver/logs/otlp.go index b89d630face1..149a1c110bda 100644 --- a/receiver/otlpreceiver/logs/otlp.go +++ b/receiver/otlpreceiver/logs/otlp.go @@ -73,9 +73,9 @@ func (r *Receiver) sendToNextConsumer(ctx context.Context, ld pdata.Logs) error ctx = client.NewContext(ctx, c) } - ctx = obsreport.StartTraceDataReceiveOp(ctx, r.instanceName, receiverTransport) + ctx = obsreport.StartLogsReceiveOp(ctx, r.instanceName, receiverTransport) err := r.nextConsumer.ConsumeLogs(ctx, ld) - obsreport.EndTraceDataReceiveOp(ctx, dataFormatProtobuf, numSpans, err) + obsreport.EndLogsReceiveOp(ctx, dataFormatProtobuf, numSpans, err) return err }