Another example log
"}` + assert.Regexp(t, regex, body) + assert.Equal(t, "key=value", req.Header.Get("X-Sumo-Fields")) assert.Equal(t, "otelcol", req.Header.Get("X-Sumo-Client")) assert.Equal(t, "application/x-www-form-urlencoded", req.Header.Get("Content-Type")) }, }) - defer func() { test.srv.Close() }() test.s.config.LogFormat = JSONFormat - test.s.logBuffer = exampleTwoLogs() - _, err := test.s.sendLogs(context.Background(), fieldsFromMap(map[string]string{"key": "value"})) + rls := plog.NewResourceLogs() + slgs := rls.ScopeLogs().AppendEmpty() + log := slgs.LogRecords().AppendEmpty() + + log.Body().SetStr("Example log") + log.Attributes().PutStr("key1", "value1") + log.Attributes().PutStr("key2", "value2") + + log = slgs.LogRecords().AppendEmpty() + log.Body().SetStr("Another example log
") + log.Attributes().PutStr("key1", "value1") + log.Attributes().PutStr("key2", "value2") + + _, err := test.s.sendNonOTLPLogs(context.Background(), + rls, + fieldsFromMap(map[string]string{"key": "value"}), + ) assert.NoError(t, err) + + assert.EqualValues(t, 1, *test.reqCounter) } func TestSendLogsJsonMultitype(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(_ http.ResponseWriter, req *http.Request) { body := extractBody(t, req) - expected := `{"key1":"value1","key2":"value2","log":{"lk1":"lv1","lk2":13}} -{"key1":"value1","key2":"value2","log":["lv2",13]}` - assert.Equal(t, expected, body) + var regex string + regex += `{"key1":"value1","key2":"value2","log":{"lk1":"lv1","lk2":13}}` + regex += `\n` + regex += `{"key1":"value1","key2":"value2","log":\["lv2",13\]}` + assert.Regexp(t, regex, body) + assert.Equal(t, "key=value", req.Header.Get("X-Sumo-Fields")) assert.Equal(t, "otelcol", req.Header.Get("X-Sumo-Client")) assert.Equal(t, "application/x-www-form-urlencoded", req.Header.Get("Content-Type")) }, }) - defer func() { test.srv.Close() }() test.s.config.LogFormat = JSONFormat - test.s.logBuffer = exampleMultitypeLogs() - _, err := test.s.sendLogs(context.Background(), fieldsFromMap(map[string]string{"key": "value"})) + rls := plog.NewResourceLogs() + slgs := rls.ScopeLogs().AppendEmpty() + + attVal := pcommon.NewValueMap() + attMap := attVal.Map() + attMap.PutStr("lk1", "lv1") + attMap.PutInt("lk2", 13) + + log := slgs.LogRecords().AppendEmpty() + attVal.CopyTo(log.Body()) + + log.Attributes().PutStr("key1", "value1") + log.Attributes().PutStr("key2", "value2") + + log = slgs.LogRecords().AppendEmpty() + + attVal = pcommon.NewValueSlice() + attArr := attVal.Slice() + strVal := pcommon.NewValueStr("lv2") + intVal := pcommon.NewValueInt(13) + + strVal.CopyTo(attArr.AppendEmpty()) + intVal.CopyTo(attArr.AppendEmpty()) + + attVal.CopyTo(log.Body()) + log.Attributes().PutStr("key1", "value1") + log.Attributes().PutStr("key2", "value2") + + _, err := test.s.sendNonOTLPLogs(context.Background(), + rls, + fieldsFromMap(map[string]string{"key": "value"}), + ) assert.NoError(t, err) + + assert.EqualValues(t, 1, *test.reqCounter) } func TestSendLogsJsonSplit(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(_ http.ResponseWriter, req *http.Request) { body := extractBody(t, req) - assert.Equal(t, `{"key1":"value1","key2":"value2","log":"Example log"}`, body) + var regex string + regex += `{"key1":"value1","key2":"value2","log":"Example log"}` + assert.Regexp(t, regex, body) }, func(_ http.ResponseWriter, req *http.Request) { body := extractBody(t, req) - assert.Equal(t, `{"key1":"value1","key2":"value2","log":"Another example log"}`, body) + var regex string + regex += `{"key1":"value1","key2":"value2","log":"Another example log"}` + assert.Regexp(t, regex, body) }, }) - defer func() { test.srv.Close() }() test.s.config.LogFormat = JSONFormat test.s.config.MaxRequestBodySize = 10 - test.s.logBuffer = exampleTwoLogs() - _, err := test.s.sendLogs(context.Background(), newFields(pcommon.NewMap())) + rls := plog.NewResourceLogs() + slgs := rls.ScopeLogs().AppendEmpty() + log := slgs.LogRecords().AppendEmpty() + + log.Body().SetStr("Example log") + log.Attributes().PutStr("key1", "value1") + log.Attributes().PutStr("key2", "value2") + + log = slgs.LogRecords().AppendEmpty() + log.Body().SetStr("Another example log") + log.Attributes().PutStr("key1", "value1") + log.Attributes().PutStr("key2", "value2") + + _, err := test.s.sendNonOTLPLogs(context.Background(), + rls, + fieldsFromMap(map[string]string{"key": "value"}), + ) assert.NoError(t, err) + + assert.EqualValues(t, 2, *test.reqCounter) } func TestSendLogsJsonSplitFailedOne(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { w.WriteHeader(500) body := extractBody(t, req) - assert.Equal(t, `{"key1":"value1","key2":"value2","log":"Example log"}`, body) + + var regex string + regex += `{"key1":"value1","key2":"value2","log":"Example log"}` + assert.Regexp(t, regex, body) }, func(_ http.ResponseWriter, req *http.Request) { body := extractBody(t, req) - assert.Equal(t, `{"key1":"value1","key2":"value2","log":"Another example log"}`, body) + + var regex string + regex += `{"key1":"value1","key2":"value2","log":"Another example log"}` + assert.Regexp(t, regex, body) }, }) - defer func() { test.srv.Close() }() test.s.config.LogFormat = JSONFormat test.s.config.MaxRequestBodySize = 10 - test.s.logBuffer = exampleTwoLogs() - dropped, err := test.s.sendLogs(context.Background(), newFields(pcommon.NewMap())) + rls := plog.NewResourceLogs() + slgs := rls.ScopeLogs().AppendEmpty() + log := slgs.LogRecords().AppendEmpty() + + log.Body().SetStr("Example log") + log.Attributes().PutStr("key1", "value1") + log.Attributes().PutStr("key2", "value2") + + log = slgs.LogRecords().AppendEmpty() + log.Body().SetStr("Another example log") + log.Attributes().PutStr("key1", "value1") + log.Attributes().PutStr("key2", "value2") + + dropped, err := test.s.sendNonOTLPLogs(context.Background(), + rls, + fieldsFromMap(map[string]string{"key": "value"}), + ) assert.EqualError(t, err, "failed sending data: status: 500 Internal Server Error") - assert.Equal(t, test.s.logBuffer[0:1], dropped) + assert.Len(t, dropped, 1) + + assert.EqualValues(t, 2, *test.reqCounter) } func TestSendLogsJsonSplitFailedAll(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { w.WriteHeader(500) body := extractBody(t, req) - assert.Equal(t, `{"key1":"value1","key2":"value2","log":"Example log"}`, body) + + var regex string + regex += `{"key1":"value1","key2":"value2","log":"Example log"}` + assert.Regexp(t, regex, body) }, func(w http.ResponseWriter, req *http.Request) { w.WriteHeader(404) body := extractBody(t, req) - assert.Equal(t, `{"key1":"value1","key2":"value2","log":"Another example log"}`, body) + + var regex string + regex += `{"key1":"value1","key2":"value2","log":"Another example log"}` + assert.Regexp(t, regex, body) }, }) - defer func() { test.srv.Close() }() test.s.config.LogFormat = JSONFormat test.s.config.MaxRequestBodySize = 10 - test.s.logBuffer = exampleTwoLogs() - dropped, err := test.s.sendLogs(context.Background(), newFields(pcommon.NewMap())) + rls := plog.NewResourceLogs() + slgs := rls.ScopeLogs().AppendEmpty() + log := slgs.LogRecords().AppendEmpty() + + log.Body().SetStr("Example log") + log.Attributes().PutStr("key1", "value1") + log.Attributes().PutStr("key2", "value2") + + log = slgs.LogRecords().AppendEmpty() + log.Body().SetStr("Another example log") + log.Attributes().PutStr("key1", "value1") + log.Attributes().PutStr("key2", "value2") + + dropped, err := test.s.sendNonOTLPLogs(context.Background(), + rls, + fields{}, + ) + assert.EqualError( t, err, "failed sending data: status: 500 Internal Server Error\nfailed sending data: status: 404 Not Found", ) - assert.Equal(t, test.s.logBuffer[0:2], dropped) + assert.Len(t, dropped, 2) + + assert.EqualValues(t, 2, *test.reqCounter) } func TestSendLogsUnexpectedFormat(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(_ http.ResponseWriter, _ *http.Request) { }, }) - defer func() { test.srv.Close() }() test.s.config.LogFormat = "dummy" - logs := exampleTwoLogs() - test.s.logBuffer = logs - dropped, err := test.s.sendLogs(context.Background(), newFields(pcommon.NewMap())) + rls := plog.NewResourceLogs() + slgs := rls.ScopeLogs().AppendEmpty() + log := slgs.LogRecords().AppendEmpty() + log.Body().SetStr("Example log") + + dropped, err := test.s.sendNonOTLPLogs(context.Background(), + rls, + fields{}, + ) assert.Error(t, err) - assert.Equal(t, logs, dropped) + assert.Len(t, dropped, 1) + assert.Equal(t, []plog.LogRecord{log}, dropped) } -func TestOverrideSourceName(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ +func TestSendLogsOTLP(t *testing.T) { + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(_ http.ResponseWriter, req *http.Request) { - assert.Equal(t, "Test source name/test_name", req.Header.Get("X-Sumo-Name")) - }, - }) - defer func() { test.srv.Close() }() - - test.s.sources.name = getTestSourceFormat("Test source name/%{key1}") - test.s.logBuffer = exampleLog() - - _, err := test.s.sendLogs(context.Background(), fieldsFromMap(map[string]string{"key1": "test_name"})) - assert.NoError(t, err) -} + body := extractBody(t, req) + //nolint:lll + assert.Equal(t, "\n\x84\x01\n\x00\x12;\n\x00\x127*\r\n\vExample log2\x10\n\x04key1\x12\b\n\x06value12\x10\n\x04key2\x12\b\n\x06value2J\x00R\x00\x12C\n\x00\x12?*\x15\n\x13Another example log2\x10\n\x04key1\x12\b\n\x06value12\x10\n\x04key2\x12\b\n\x06value2J\x00R\x00", body) -func TestOverrideSourceCategory(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ - func(_ http.ResponseWriter, req *http.Request) { - assert.Equal(t, "Test source category/test_name", req.Header.Get("X-Sumo-Category")) + assert.Equal(t, "otelcol", req.Header.Get("X-Sumo-Client")) + assert.Equal(t, "application/x-protobuf", req.Header.Get("Content-Type")) + + assert.Empty(t, req.Header.Get("X-Sumo-Fields"), + "We should not get X-Sumo-Fields header when sending data with OTLP", + ) + assert.Empty(t, req.Header.Get("X-Sumo-Category"), + "We should not get X-Sumo-Category header when sending data with OTLP", + ) + assert.Empty(t, req.Header.Get("X-Sumo-Name"), + "We should not get X-Sumo-Name header when sending data with OTLP", + ) + assert.Empty(t, req.Header.Get("X-Sumo-Host"), + "We should not get X-Sumo-Host header when sending data with OTLP", + ) }, }) - defer func() { test.srv.Close() }() - test.s.sources.category = getTestSourceFormat("Test source category/%{key1}") - test.s.logBuffer = exampleLog() + test.s.config.LogFormat = "otlp" - _, err := test.s.sendLogs(context.Background(), fieldsFromMap(map[string]string{"key1": "test_name"})) - assert.NoError(t, err) -} + l := plog.NewLogs() + ls := l.ResourceLogs().AppendEmpty() -func TestOverrideSourceHost(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ - func(_ http.ResponseWriter, req *http.Request) { - assert.Equal(t, "Test source host/test_name", req.Header.Get("X-Sumo-Host")) - }, - }) - defer func() { test.srv.Close() }() + logRecords := exampleTwoLogs() + for i := 0; i < len(logRecords); i++ { + logRecords[i].MoveTo(ls.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()) + } - test.s.sources.host = getTestSourceFormat("Test source host/%{key1}") - test.s.logBuffer = exampleLog() + l.MarkReadOnly() - _, err := test.s.sendLogs(context.Background(), fieldsFromMap(map[string]string{"key1": "test_name"})) - assert.NoError(t, err) + assert.NoError(t, test.s.sendOTLPLogs(context.Background(), l)) + assert.EqualValues(t, 1, *test.reqCounter) } -func TestLogsBuffer(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){}) - defer func() { test.srv.Close() }() - - assert.Equal(t, test.s.countLogs(), 0) - logs := exampleTwoLogs() - - droppedLogs, err := test.s.batchLog(context.Background(), logs[0], newFields(pcommon.NewMap())) - require.NoError(t, err) - assert.Nil(t, droppedLogs) - assert.Equal(t, 1, test.s.countLogs()) - assert.Equal(t, []plog.LogRecord{logs[0]}, test.s.logBuffer) - - droppedLogs, err = test.s.batchLog(context.Background(), logs[1], newFields(pcommon.NewMap())) - require.NoError(t, err) - assert.Nil(t, droppedLogs) - assert.Equal(t, 2, test.s.countLogs()) - assert.Equal(t, logs, test.s.logBuffer) - - test.s.cleanLogsBuffer() - assert.Equal(t, 0, test.s.countLogs()) - assert.Equal(t, []plog.LogRecord{}, test.s.logBuffer) +func TestLogsHandlesReceiverResponses(t *testing.T) { + t.Run("json with too many fields logs a warning", func(t *testing.T) { + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ + func(w http.ResponseWriter, _ *http.Request) { + fmt.Fprintf(w, `{ + "status" : 200, + "id" : "YBLR1-S2T29-MVXEJ", + "code" : "bad.http.header.fields", + "message" : "X-Sumo-Fields Warning: 14 key-value pairs are dropped as they are exceeding maximum key-value pair number limit 30." + }`) + }, + }, func(c *Config) { + c.LogFormat = JSONFormat + }) + + rls := plog.NewResourceLogs() + rls.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("Example log") + + var buffer bytes.Buffer + writer := bufio.NewWriter(&buffer) + test.s.logger = zap.New( + zapcore.NewCore( + zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()), + zapcore.AddSync(writer), + zapcore.DebugLevel, + ), + ) + + _, err := test.s.sendNonOTLPLogs(context.Background(), + rls, + fieldsFromMap( + map[string]string{ + "cluster": "abcaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + "code": "4222222222222222222222222222222222222222222222222222222222222222222222222222222222222", + "component": "apiserver", + "endpoint": "httpsaaaaaaaaaaaaaaaaaaa", + "a": "a", + "b": "b", + "c": "c", + "d": "d", + "e": "e", + "f": "f", + "g": "g", + "q": "q", + "w": "w", + "r": "r", + "t": "t", + "y": "y", + "1": "1", + "2": "2", + "3": "3", + "4": "4", + "5": "5", + "6": "6", + "7": "7", + "8": "8", + "9": "9", + "10": "10", + "11": "11", + "12": "12", + "13": "13", + "14": "14", + "15": "15", + "16": "16", + "17": "17", + "18": "18", + "19": "19", + "20": "20", + "21": "21", + "22": "22", + "23": "23", + "24": "24", + "25": "25", + "26": "26", + "27": "27", + "28": "28", + "29": "29", + "_sourceName": "test_source_name", + "_sourceHost": "test_source_host", + "_sourceCategory": "test_source_category", + }), + ) + assert.NoError(t, writer.Flush()) + assert.NoError(t, err) + assert.EqualValues(t, 1, *test.reqCounter) + + assert.Contains(t, + buffer.String(), + `There was an issue sending data {`+ + `"status": "200 OK", `+ + `"id": "YBLR1-S2T29-MVXEJ", `+ + `"code": "bad.http.header.fields", `+ + `"message": "X-Sumo-Fields Warning: 14 key-value pairs are dropped as they are exceeding maximum key-value pair number limit 30."`, + ) + }) } func TestInvalidEndpoint(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){}) - defer func() { test.srv.Close() }() + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){}) test.s.dataURLLogs = ":" - test.s.logBuffer = exampleLog() - _, err := test.s.sendLogs(context.Background(), newFields(pcommon.NewMap())) + rls := plog.NewResourceLogs() + rls.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("Example log") + + _, err := test.s.sendNonOTLPLogs(context.Background(), rls, fields{}) assert.EqualError(t, err, `parse ":": missing protocol scheme`) } func TestInvalidPostRequest(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){}) - defer func() { test.srv.Close() }() + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){}) test.s.dataURLLogs = "" - test.s.logBuffer = exampleLog() + rls := plog.NewResourceLogs() + rls.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("Example log") - _, err := test.s.sendLogs(context.Background(), newFields(pcommon.NewMap())) + _, err := test.s.sendNonOTLPLogs(context.Background(), rls, fields{}) assert.EqualError(t, err, `Post "": unsupported protocol scheme ""`) } -func TestLogsBufferOverflow(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){}) - defer func() { test.srv.Close() }() - - test.s.dataURLLogs = ":" - log := exampleLog() - flds := newFields(pcommon.NewMap()) - - for test.s.countLogs() < maxBufferSize-1 { - _, err := test.s.batchLog(context.Background(), log[0], flds) - require.NoError(t, err) - } - - _, err := test.s.batchLog(context.Background(), log[0], flds) - assert.EqualError(t, err, `parse ":": missing protocol scheme`) - assert.Equal(t, 0, test.s.countLogs()) -} - func TestInvalidMetricFormat(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){}) + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){}) defer func() { test.srv.Close() }() test.s.config.MetricFormat = "invalid" @@ -531,7 +950,7 @@ func TestInvalidMetricFormat(t *testing.T) { } func TestInvalidPipeline(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){}) + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){}) defer func() { test.srv.Close() }() err := test.s.send(context.Background(), "invalidPipeline", newCountingReader(0).withString(""), newFields(pcommon.NewMap())) @@ -539,79 +958,91 @@ func TestInvalidPipeline(t *testing.T) { } func TestSendCompressGzip(t *testing.T) { - test := prepareSenderTest(t, []func(res http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, configcompression.TypeGzip, []func(res http.ResponseWriter, req *http.Request){ func(res http.ResponseWriter, req *http.Request) { res.WriteHeader(200) - _, err := res.Write([]byte("")) - require.NoError(t, err) + if _, err := res.Write([]byte("")); err != nil { + res.WriteHeader(http.StatusInternalServerError) + assert.FailNow(t, "err: %v", err) + return + } body := decodeGzip(t, req.Body) assert.Equal(t, "gzip", req.Header.Get("Content-Encoding")) assert.Equal(t, "Some example log", body) }, }) - defer func() { test.srv.Close() }() - - test.s.config.CompressEncoding = "gzip" - - c, err := newCompressor("gzip") - require.NoError(t, err) - test.s.compressor = c reader := newCountingReader(0).withString("Some example log") - err = test.s.send(context.Background(), LogsPipeline, reader, newFields(pcommon.NewMap())) + err := test.s.send(context.Background(), LogsPipeline, reader, fields{}) require.NoError(t, err) } -func TestSendCompressDeflate(t *testing.T) { - test := prepareSenderTest(t, []func(res http.ResponseWriter, req *http.Request){ +func TestSendCompressGzipDeprecated(t *testing.T) { + test := prepareSenderTest(t, "default", []func(res http.ResponseWriter, req *http.Request){ func(res http.ResponseWriter, req *http.Request) { res.WriteHeader(200) - _, err := res.Write([]byte("")) - require.NoError(t, err) - body := decodeDeflate(t, req.Body) - assert.Equal(t, "deflate", req.Header.Get("Content-Encoding")) + if _, err := res.Write([]byte("")); err != nil { + res.WriteHeader(http.StatusInternalServerError) + assert.FailNow(t, "err: %v", err) + return + } + body := decodeGzip(t, req.Body) + assert.Equal(t, "gzip", req.Header.Get("Content-Encoding")) assert.Equal(t, "Some example log", body) }, }) - defer func() { test.srv.Close() }() - - test.s.config.CompressEncoding = "deflate" - c, err := newCompressor("deflate") - require.NoError(t, err) - - test.s.compressor = c reader := newCountingReader(0).withString("Some example log") - err = test.s.send(context.Background(), LogsPipeline, reader, newFields(pcommon.NewMap())) + err := test.s.send(context.Background(), LogsPipeline, reader, fields{}) require.NoError(t, err) } -func TestCompressionError(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){}) - defer func() { test.srv.Close() }() +func TestSendCompressZstd(t *testing.T) { + test := prepareSenderTest(t, configcompression.TypeZstd, []func(res http.ResponseWriter, req *http.Request){ + func(res http.ResponseWriter, req *http.Request) { + res.WriteHeader(200) + if _, err := res.Write([]byte("")); err != nil { + res.WriteHeader(http.StatusInternalServerError) + assert.FailNow(t, "err: %v", err) + return + } + body := decodeZstd(t, req.Body) + assert.Equal(t, "zstd", req.Header.Get("Content-Encoding")) + assert.Equal(t, "Some example log", body) + }, + }) - test.s.compressor = getTestCompressor(errors.New("read error"), nil) reader := newCountingReader(0).withString("Some example log") - err := test.s.send(context.Background(), LogsPipeline, reader, newFields(pcommon.NewMap())) - assert.EqualError(t, err, "read error") + err := test.s.send(context.Background(), LogsPipeline, reader, fields{}) + require.NoError(t, err) } -func TestInvalidContentEncoding(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){}) - defer func() { test.srv.Close() }() +func TestSendCompressDeflate(t *testing.T) { + test := prepareSenderTest(t, configcompression.TypeDeflate, []func(res http.ResponseWriter, req *http.Request){ + func(res http.ResponseWriter, req *http.Request) { + res.WriteHeader(200) + if _, err := res.Write([]byte("")); err != nil { + res.WriteHeader(http.StatusInternalServerError) + assert.FailNow(t, "err: %v", err) + return + } + body := decodeZlib(t, req.Body) + assert.Equal(t, "deflate", req.Header.Get("Content-Encoding")) + assert.Equal(t, "Some example log", body) + }, + }) - test.s.config.CompressEncoding = "test" reader := newCountingReader(0).withString("Some example log") - err := test.s.send(context.Background(), LogsPipeline, reader, newFields(pcommon.NewMap())) - assert.EqualError(t, err, "invalid content encoding: test") + err := test.s.send(context.Background(), LogsPipeline, reader, fields{}) + require.NoError(t, err) } func TestSendMetrics(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(_ http.ResponseWriter, req *http.Request) { body := extractBody(t, req) expected := `` + @@ -639,7 +1070,7 @@ func TestSendMetrics(t *testing.T) { } func TestSendMetricsSplit(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(_ http.ResponseWriter, req *http.Request) { body := extractBody(t, req) expected := `` + @@ -667,7 +1098,7 @@ func TestSendMetricsSplit(t *testing.T) { } func TestSendOTLPHistogram(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(_ http.ResponseWriter, req *http.Request) { unmarshaler := pmetric.ProtoUnmarshaler{} body, err := io.ReadAll(req.Body) @@ -698,7 +1129,7 @@ func TestSendOTLPHistogram(t *testing.T) { } func TestSendMetricsSplitBySource(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(_ http.ResponseWriter, req *http.Request) { body := extractBody(t, req) expected := `test.metric.data{test="test_value",test2="second_value",_sourceHost="value1"} 14500 1605534165000` @@ -736,7 +1167,7 @@ func TestSendMetricsSplitBySource(t *testing.T) { } func TestSendMetricsSplitFailedOne(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { w.WriteHeader(500) @@ -777,7 +1208,7 @@ func TestSendMetricsSplitFailedOne(t *testing.T) { } func TestSendMetricsSplitFailedAll(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { w.WriteHeader(500) @@ -831,7 +1262,7 @@ func TestSendMetricsSplitFailedAll(t *testing.T) { func TestSendMetricsUnexpectedFormat(t *testing.T) { // Expect no requestes - test := prepareSenderTest(t, nil) + test := prepareSenderTest(t, NoCompression, nil) test.s.config.MetricFormat = "invalid" metricSum, attrs := exampleIntMetric() @@ -846,7 +1277,7 @@ func TestSendMetricsUnexpectedFormat(t *testing.T) { } func TestBadRequestCausesPermanentError(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(res http.ResponseWriter, _ *http.Request) { res.WriteHeader(400) }, From 2502368da65e9eaf1d3228ec205f6ebe6c505564 Mon Sep 17 00:00:00 2001 From: Curtis RobertInput body | Output body |
+ +```json +{ + "timestamp": "", + "body": "{\"log\":\"INFO: log line here\",\"stream\":\"stdout\",\"time\":\"2029-03-30T08:31:20.545192187Z\"}", + "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log" +} +``` + + | ++ +```json +{ + "timestamp": "2024-03-30 08:31:20.545192187 +0000 UTC", + "body": "log line here", + "attributes": { + "time": "2024-03-30T08:31:20.545192187Z", + "log.iostream": "stdout", + "k8s.pod.name": "kube-controller-kind-control-plane", + "k8s.pod.uid": "49cc7c1fd3702c40b2686ea7486091d6", + "k8s.container.name": "kube-controller", + "k8s.container.restart_count": "1", + "k8s.namespace.name": "some", + "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log" + } +} +``` + + | +
Input body | Output body |
+ +```json +{ + "timestamp": "", + "body": "2024-04-13T07:59:37.505201169-05:00 stdout F standalone crio line which is awesome", + "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log" +} +``` + + | ++ +```json +{ + "timestamp": "2024-04-13 12:59:37.505201169 +0000 UTC", + "body": "standalone crio line which is awesome", + "attributes": { + "time": "2024-04-13T07:59:37.505201169-05:00", + "logtag": "F", + "log.iostream": "stdout", + "k8s.pod.name": "kube-controller-kind-control-plane", + "k8s.pod.uid": "49cc7c1fd3702c40b2686ea7486091d6", + "k8s.container.name": "kube-controller", + "k8s.container.restart_count": "1", + "k8s.namespace.name": "some", + "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log" + } +} +``` + + | +
Input body | Output body |
+ +```json +{ + "timestamp": "", + "body": "2023-06-22T10:27:25.813799277Z stdout F standalone containerd line that is super awesome", + "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log" +} +``` + + | ++ +```json +{ + "timestamp": "2023-06-22 10:27:25.813799277 +0000 UTC", + "body": "standalone containerd line that is super awesome", + "attributes": { + "time": "2023-06-22T10:27:25.813799277Z", + "logtag": "F", + "log.iostream": "stdout", + "k8s.pod.name": "kube-controller-kind-control-plane", + "k8s.pod.uid": "49cc7c1fd3702c40b2686ea7486091d6", + "k8s.container.name": "kube-controller", + "k8s.container.restart_count": "1", + "k8s.namespace.name": "some", + "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log" + } +} +``` + + | +
Input body | Output body |
+ +```json +{ + "timestamp": "", + "body": "2023-06-22T10:27:25.813799277Z stdout P multiline containerd line that i", + "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log" +}, +{ + "timestamp": "", + "body": "2023-06-22T10:27:25.813799277Z stdout F s super awesomne", + "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log" +} +``` + + | ++ +```json +{ + "timestamp": "2023-06-22 10:27:25.813799277 +0000 UTC", + "body": "multiline containerd line that is super awesome", + "attributes": { + "time": "2023-06-22T10:27:25.813799277Z", + "logtag": "F", + "log.iostream": "stdout", + "k8s.pod.name": "kube-controller-kind-control-plane", + "k8s.pod.uid": "49cc7c1fd3702c40b2686ea7486091d6", + "k8s.container.name": "kube-controller", + "k8s.container.restart_count": "1", + "k8s.namespace.name": "some", + "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log" + } +} +``` + + | +