From 6b011c06fc1552c096a9cff17616aebdaff4b4f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20Stepp=C3=A9?= Date: Fri, 19 Jul 2024 17:46:29 +0200 Subject: [PATCH] feat: lambda-promtail; ensure messages to Kinesis are usable by refactoring parsing of KinesisEvent to match parsing of CWEvents + code cleanup (#13098) --- .../lambda-promtail/json_stream.go | 3 +- .../lambda-promtail/kinesis.go | 78 ++++++++++++++----- .../lambda-promtail/kinesis_test.go | 33 +------- tools/lambda-promtail/lambda-promtail/main.go | 4 +- .../lambda-promtail/promtail.go | 11 +-- .../lambda-promtail/promtail_client.go | 2 - tools/lambda-promtail/lambda-promtail/s3.go | 10 +-- .../testdata/kinesis-event.json | 59 ++++++-------- 8 files changed, 95 insertions(+), 105 deletions(-) diff --git a/tools/lambda-promtail/lambda-promtail/json_stream.go b/tools/lambda-promtail/lambda-promtail/json_stream.go index bce105cf86fe2..b33d0bc2223e5 100644 --- a/tools/lambda-promtail/lambda-promtail/json_stream.go +++ b/tools/lambda-promtail/lambda-promtail/json_stream.go @@ -28,8 +28,7 @@ func NewJSONStream(recordChan chan Record) Stream { func (s Stream) Start(r io.ReadCloser, tokenCountToTarget int) { defer r.Close() defer close(s.records) - var decoder *json.Decoder - decoder = json.NewDecoder(r) + decoder := json.NewDecoder(r) // Skip the provided count of JSON tokens to get the the target array, ex: "{" "Record" for i := 0; i < tokenCountToTarget; i++ { diff --git a/tools/lambda-promtail/lambda-promtail/kinesis.go b/tools/lambda-promtail/lambda-promtail/kinesis.go index 31b619284b06d..64b57d91fd8a9 100644 --- a/tools/lambda-promtail/lambda-promtail/kinesis.go +++ b/tools/lambda-promtail/lambda-promtail/kinesis.go @@ -4,7 +4,9 @@ import ( "bytes" "compress/gzip" "context" + "encoding/json" "io" + "log" "time" "github.com/aws/aws-lambda-go/events" @@ -13,36 +15,34 @@ import ( "github.com/grafana/loki/pkg/logproto" ) -func parseKinesisEvent(ctx context.Context, b batchIf, ev *events.KinesisEvent) error { +func parseKinesisEvent(ctx context.Context, b *batch, ev *events.KinesisEvent) error { if ev == nil { return nil } - for _, record := range ev.Records { - timestamp := time.Unix(record.Kinesis.ApproximateArrivalTimestamp.Unix(), 0) - - labels := model.LabelSet{ - model.LabelName("__aws_log_type"): model.LabelValue("kinesis"), - model.LabelName("__aws_kinesis_event_source_arn"): model.LabelValue(record.EventSourceArn), - } - - labels = applyLabels(labels) + var data []byte + var recordData events.CloudwatchLogsData + var err error - // Check if the data is gzipped by inspecting the 'data' field + for _, record := range ev.Records { if isGzipped(record.Kinesis.Data) { - uncompressedData, err := ungzipData(record.Kinesis.Data) + data, err = ungzipData(record.Kinesis.Data) if err != nil { - return err + log.Printf("Error decompressing data: %v", err) } - b.add(ctx, entry{labels, logproto.Entry{ - Line: string(uncompressedData), - Timestamp: timestamp, - }}) } else { - b.add(ctx, entry{labels, logproto.Entry{ - Line: string(record.Kinesis.Data), - Timestamp: timestamp, - }}) + data = record.Kinesis.Data + } + + recordData, err = unmarshalData(data) + if err != nil { + log.Printf("Error unmarshalling data: %v", err) + } + + labels := createLabels(record, recordData) + + if err := processLogEvents(ctx, b, recordData.LogEvents, labels); err != nil { + return err } } @@ -79,3 +79,39 @@ func ungzipData(data []byte) ([]byte, error) { return io.ReadAll(reader) } + +func unmarshalData(data []byte) (events.CloudwatchLogsData, error) { + var recordData events.CloudwatchLogsData + err := json.Unmarshal(data, &recordData) + return recordData, err +} + +func createLabels(record events.KinesisEventRecord, recordData events.CloudwatchLogsData) model.LabelSet { + labels := model.LabelSet{ + model.LabelName("__aws_log_type"): model.LabelValue("kinesis"), + model.LabelName("__aws_kinesis_event_source_arn"): model.LabelValue(record.EventSourceArn), + model.LabelName("__aws_cloudwatch_log_group"): model.LabelValue(recordData.LogGroup), + model.LabelName("__aws_cloudwatch_owner"): model.LabelValue(recordData.Owner), + } + + if keepStream { + labels[model.LabelName("__aws_cloudwatch_log_stream")] = model.LabelValue(recordData.LogStream) + } + + return applyLabels(labels) +} + +func processLogEvents(ctx context.Context, b *batch, logEvents []events.CloudwatchLogsLogEvent, labels model.LabelSet) error { + for _, logEvent := range logEvents { + timestamp := time.UnixMilli(logEvent.Timestamp) + + if err := b.add(ctx, entry{labels, logproto.Entry{ + Line: logEvent.Message, + Timestamp: timestamp, + }}); err != nil { + return err + } + } + + return nil +} diff --git a/tools/lambda-promtail/lambda-promtail/kinesis_test.go b/tools/lambda-promtail/lambda-promtail/kinesis_test.go index bd29cb6a7b902..9a29b69f05624 100644 --- a/tools/lambda-promtail/lambda-promtail/kinesis_test.go +++ b/tools/lambda-promtail/lambda-promtail/kinesis_test.go @@ -12,28 +12,6 @@ import ( "github.com/grafana/loki/pkg/logproto" ) -type MockBatch struct { - streams map[string]*logproto.Stream - size int -} - -func (b *MockBatch) add(_ context.Context, e entry) error { - b.streams[e.labels.String()] = &logproto.Stream{ - Labels: e.labels.String(), - } - return nil -} - -func (b *MockBatch) flushBatch(_ context.Context) error { - return nil -} -func (b *MockBatch) encode() ([]byte, int, error) { - return nil, 0, nil -} -func (b *MockBatch) createPushRequest() (*logproto.PushRequest, int) { - return nil, 0 -} - func ReadJSONFromFile(t *testing.T, inputFile string) []byte { inputJSON, err := os.ReadFile(inputFile) if err != nil { @@ -45,6 +23,9 @@ func ReadJSONFromFile(t *testing.T, inputFile string) []byte { func TestLambdaPromtail_KinesisParseEvents(t *testing.T) { inputJson, err := os.ReadFile("../testdata/kinesis-event.json") + mockBatch := &batch{ + streams: map[string]*logproto.Stream{}, + } if err != nil { t.Errorf("could not open test file. details: %v", err) @@ -56,13 +37,7 @@ func TestLambdaPromtail_KinesisParseEvents(t *testing.T) { } ctx := context.TODO() - b := &MockBatch{ - streams: map[string]*logproto.Stream{}, - } - err = parseKinesisEvent(ctx, b, &testEvent) + err = parseKinesisEvent(ctx, mockBatch, &testEvent) require.Nil(t, err) - - labels_str := "{__aws_kinesis_event_source_arn=\"arn:aws:kinesis:us-east-1:123456789012:stream/simple-stream\", __aws_log_type=\"kinesis\"}" - require.Contains(t, b.streams, labels_str) } diff --git a/tools/lambda-promtail/lambda-promtail/main.go b/tools/lambda-promtail/lambda-promtail/main.go index 0e0df1e880041..0a58480c20743 100644 --- a/tools/lambda-promtail/lambda-promtail/main.go +++ b/tools/lambda-promtail/lambda-promtail/main.go @@ -187,7 +187,7 @@ func checkEventType(ev map[string]interface{}) (interface{}, error) { reader.Seek(0, 0) } - return nil, fmt.Errorf("unknown event type!") + return nil, fmt.Errorf("unknown event type") } func handler(ctx context.Context, ev map[string]interface{}) error { @@ -210,7 +210,7 @@ func handler(ctx context.Context, ev map[string]interface{}) error { event, err := checkEventType(ev) if err != nil { - level.Error(*pClient.log).Log("err", fmt.Errorf("invalid event: %s\n", ev)) + level.Error(*pClient.log).Log("err", fmt.Errorf("invalid event: %s", ev)) return err } diff --git a/tools/lambda-promtail/lambda-promtail/promtail.go b/tools/lambda-promtail/lambda-promtail/promtail.go index c1d01c36b174b..ef5dae14c2a04 100644 --- a/tools/lambda-promtail/lambda-promtail/promtail.go +++ b/tools/lambda-promtail/lambda-promtail/promtail.go @@ -42,13 +42,6 @@ type batch struct { client Client } -type batchIf interface { - add(ctx context.Context, e entry) error - encode() ([]byte, int, error) - createPushRequest() (*logproto.PushRequest, int) - flushBatch(ctx context.Context) error -} - func newBatch(ctx context.Context, pClient Client, entries ...entry) (*batch, error) { b := &batch{ streams: map[string]*logproto.Stream{}, @@ -158,7 +151,7 @@ func (c *promtailClient) sendToPromtail(ctx context.Context, b *batch) error { if status > 0 && status != 429 && status/100 != 5 { break } - level.Error(*c.log).Log("err", fmt.Errorf("error sending batch, will retry, status: %d error: %s\n", status, err)) + level.Error(*c.log).Log("err", fmt.Errorf("error sending batch, will retry, status: %d error: %s", status, err)) backoff.Wait() // Make sure it sends at least once before checking for retry. @@ -168,7 +161,7 @@ func (c *promtailClient) sendToPromtail(ctx context.Context, b *batch) error { } if err != nil { - level.Error(*c.log).Log("err", fmt.Errorf("Failed to send logs! %s\n", err)) + level.Error(*c.log).Log("err", fmt.Errorf("failed to send logs! %s", err)) return err } diff --git a/tools/lambda-promtail/lambda-promtail/promtail_client.go b/tools/lambda-promtail/lambda-promtail/promtail_client.go index 470a760b5233b..a322e82452b7a 100644 --- a/tools/lambda-promtail/lambda-promtail/promtail_client.go +++ b/tools/lambda-promtail/lambda-promtail/promtail_client.go @@ -4,7 +4,6 @@ import ( "context" "crypto/tls" "net/http" - "net/url" "time" "github.com/go-kit/log" @@ -25,7 +24,6 @@ type promtailClient struct { type promtailClientConfig struct { backoff *backoff.Config http *httpClientConfig - url *url.URL } type httpClientConfig struct { diff --git a/tools/lambda-promtail/lambda-promtail/s3.go b/tools/lambda-promtail/lambda-promtail/s3.go index 77694ba603432..b919915533e29 100644 --- a/tools/lambda-promtail/lambda-promtail/s3.go +++ b/tools/lambda-promtail/lambda-promtail/s3.go @@ -187,17 +187,17 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io. var lineCount int for scanner.Scan() { - log_line := scanner.Text() + logLine := scanner.Text() lineCount++ if lineCount <= parser.skipHeaderCount { continue } if printLogLine { - fmt.Println(log_line) + fmt.Println(logLine) } timestamp := time.Now() - match := parser.timestampRegex.FindStringSubmatch(log_line) + match := parser.timestampRegex.FindStringSubmatch(logLine) if len(match) > 0 { if labels["lb_type"] == LB_NLB_TYPE { // NLB logs don't have .SSSSSSZ suffix. RFC3339 requires a TZ specifier, use UTC @@ -222,7 +222,7 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io. } if err := b.add(ctx, entry{ls, logproto.Entry{ - Line: log_line, + Line: logLine, Timestamp: timestamp, }}); err != nil { return err @@ -281,7 +281,7 @@ func processS3Event(ctx context.Context, ev *events.S3Event, pc Client, log *log ExpectedBucketOwner: aws.String(labels["bucketOwner"]), }) if err != nil { - return fmt.Errorf("Failed to get object %s from bucket %s on account %s\n, %s", labels["key"], labels["bucket"], labels["bucketOwner"], err) + return fmt.Errorf("failed to get object %s from bucket %s on account %s, %s", labels["key"], labels["bucket"], labels["bucketOwner"], err) } err = parseS3Log(ctx, batch, labels, obj.Body, log) if err != nil { diff --git a/tools/lambda-promtail/testdata/kinesis-event.json b/tools/lambda-promtail/testdata/kinesis-event.json index c3cb2020f1e83..9250b093bd66b 100644 --- a/tools/lambda-promtail/testdata/kinesis-event.json +++ b/tools/lambda-promtail/testdata/kinesis-event.json @@ -1,36 +1,25 @@ { - "Records": [ - { - "kinesis": { - "kinesisSchemaVersion": "1.0", - "partitionKey": "s1", - "sequenceNumber": "49568167373333333333333333333333333333333333333333333333", - "data": "SGVsbG8gV29ybGQ=", - "approximateArrivalTimestamp": 1480641523.477 - }, - "eventSource": "aws:kinesis", - "eventVersion": "1.0", - "eventID": "shardId-000000000000:49568167373333333333333333333333333333333333333333333333", - "eventName": "aws:kinesis:record", - "invokeIdentityArn": "arn:aws:iam::123456789012:role/LambdaRole", - "awsRegion": "us-east-1", - "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/simple-stream" - }, - { - "kinesis": { - "kinesisSchemaVersion": "1.0", - "partitionKey": "s1", - "sequenceNumber": "49568167373333333334444444444444444444444444444444444444", - "data": "SGVsbG8gV29ybGQ=", - "approximateArrivalTimestamp": 1480841523.477 - }, - "eventSource": "aws:kinesis", - "eventVersion": "1.0", - "eventID": "shardId-000000000000:49568167373333333334444444444444444444444444444444444444", - "eventName": "aws:kinesis:record", - "invokeIdentityArn": "arn:aws:iam::123456789012:role/LambdaRole", - "awsRegion": "us-east-1", - "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/simple-stream" - } - ] -} \ No newline at end of file + "messageType": "DATA_MESSAGE", + "owner": "some_owner", + "logGroup": "test-logroup", + "logStream": "test-logstream", + "subscriptionFilters": ["test-subscription"], + "logEvents": [ + { + "id": "98237509", + "timestamp": 1719922604969, + "message": "some_message" + }, + { + "id": "20396236", + "timestamp": 1719922604969, + "message": "some_message" + }, + { + "id": "23485670", + "timestamp": 1719922604969, + "message": "some_message" + } + ] +} +