Skip to content

Commit

Permalink
feat: lambda-promtail; ensure messages to Kinesis are usable by refac…
Browse files Browse the repository at this point in the history
…toring parsing of KinesisEvent to match parsing of CWEvents + code cleanup (grafana#13098)
  • Loading branch information
HatiCode authored and pull[bot] committed Oct 9, 2024
1 parent 3406d0b commit 6b011c0
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 105 deletions.
3 changes: 1 addition & 2 deletions tools/lambda-promtail/lambda-promtail/json_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ func NewJSONStream(recordChan chan Record) Stream {
func (s Stream) Start(r io.ReadCloser, tokenCountToTarget int) {
defer r.Close()
defer close(s.records)
var decoder *json.Decoder
decoder = json.NewDecoder(r)
decoder := json.NewDecoder(r)

// Skip the provided count of JSON tokens to get the the target array, ex: "{" "Record"
for i := 0; i < tokenCountToTarget; i++ {
Expand Down
78 changes: 57 additions & 21 deletions tools/lambda-promtail/lambda-promtail/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"io"
"log"
"time"

"github.com/aws/aws-lambda-go/events"
Expand All @@ -13,36 +15,34 @@ import (
"github.com/grafana/loki/pkg/logproto"
)

func parseKinesisEvent(ctx context.Context, b batchIf, ev *events.KinesisEvent) error {
func parseKinesisEvent(ctx context.Context, b *batch, ev *events.KinesisEvent) error {
if ev == nil {
return nil
}

for _, record := range ev.Records {
timestamp := time.Unix(record.Kinesis.ApproximateArrivalTimestamp.Unix(), 0)

labels := model.LabelSet{
model.LabelName("__aws_log_type"): model.LabelValue("kinesis"),
model.LabelName("__aws_kinesis_event_source_arn"): model.LabelValue(record.EventSourceArn),
}

labels = applyLabels(labels)
var data []byte
var recordData events.CloudwatchLogsData
var err error

// Check if the data is gzipped by inspecting the 'data' field
for _, record := range ev.Records {
if isGzipped(record.Kinesis.Data) {
uncompressedData, err := ungzipData(record.Kinesis.Data)
data, err = ungzipData(record.Kinesis.Data)
if err != nil {
return err
log.Printf("Error decompressing data: %v", err)
}
b.add(ctx, entry{labels, logproto.Entry{
Line: string(uncompressedData),
Timestamp: timestamp,
}})
} else {
b.add(ctx, entry{labels, logproto.Entry{
Line: string(record.Kinesis.Data),
Timestamp: timestamp,
}})
data = record.Kinesis.Data
}

recordData, err = unmarshalData(data)
if err != nil {
log.Printf("Error unmarshalling data: %v", err)
}

labels := createLabels(record, recordData)

if err := processLogEvents(ctx, b, recordData.LogEvents, labels); err != nil {
return err
}
}

Expand Down Expand Up @@ -79,3 +79,39 @@ func ungzipData(data []byte) ([]byte, error) {

return io.ReadAll(reader)
}

func unmarshalData(data []byte) (events.CloudwatchLogsData, error) {
var recordData events.CloudwatchLogsData
err := json.Unmarshal(data, &recordData)
return recordData, err
}

func createLabels(record events.KinesisEventRecord, recordData events.CloudwatchLogsData) model.LabelSet {
labels := model.LabelSet{
model.LabelName("__aws_log_type"): model.LabelValue("kinesis"),
model.LabelName("__aws_kinesis_event_source_arn"): model.LabelValue(record.EventSourceArn),
model.LabelName("__aws_cloudwatch_log_group"): model.LabelValue(recordData.LogGroup),
model.LabelName("__aws_cloudwatch_owner"): model.LabelValue(recordData.Owner),
}

if keepStream {
labels[model.LabelName("__aws_cloudwatch_log_stream")] = model.LabelValue(recordData.LogStream)
}

return applyLabels(labels)
}

func processLogEvents(ctx context.Context, b *batch, logEvents []events.CloudwatchLogsLogEvent, labels model.LabelSet) error {
for _, logEvent := range logEvents {
timestamp := time.UnixMilli(logEvent.Timestamp)

if err := b.add(ctx, entry{labels, logproto.Entry{
Line: logEvent.Message,
Timestamp: timestamp,
}}); err != nil {
return err
}
}

return nil
}
33 changes: 4 additions & 29 deletions tools/lambda-promtail/lambda-promtail/kinesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,6 @@ import (
"github.com/grafana/loki/pkg/logproto"
)

type MockBatch struct {
streams map[string]*logproto.Stream
size int
}

func (b *MockBatch) add(_ context.Context, e entry) error {
b.streams[e.labels.String()] = &logproto.Stream{
Labels: e.labels.String(),
}
return nil
}

func (b *MockBatch) flushBatch(_ context.Context) error {
return nil
}
func (b *MockBatch) encode() ([]byte, int, error) {
return nil, 0, nil
}
func (b *MockBatch) createPushRequest() (*logproto.PushRequest, int) {
return nil, 0
}

func ReadJSONFromFile(t *testing.T, inputFile string) []byte {
inputJSON, err := os.ReadFile(inputFile)
if err != nil {
Expand All @@ -45,6 +23,9 @@ func ReadJSONFromFile(t *testing.T, inputFile string) []byte {

func TestLambdaPromtail_KinesisParseEvents(t *testing.T) {
inputJson, err := os.ReadFile("../testdata/kinesis-event.json")
mockBatch := &batch{
streams: map[string]*logproto.Stream{},
}

if err != nil {
t.Errorf("could not open test file. details: %v", err)
Expand All @@ -56,13 +37,7 @@ func TestLambdaPromtail_KinesisParseEvents(t *testing.T) {
}

ctx := context.TODO()
b := &MockBatch{
streams: map[string]*logproto.Stream{},
}

err = parseKinesisEvent(ctx, b, &testEvent)
err = parseKinesisEvent(ctx, mockBatch, &testEvent)
require.Nil(t, err)

labels_str := "{__aws_kinesis_event_source_arn=\"arn:aws:kinesis:us-east-1:123456789012:stream/simple-stream\", __aws_log_type=\"kinesis\"}"
require.Contains(t, b.streams, labels_str)
}
4 changes: 2 additions & 2 deletions tools/lambda-promtail/lambda-promtail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func checkEventType(ev map[string]interface{}) (interface{}, error) {
reader.Seek(0, 0)
}

return nil, fmt.Errorf("unknown event type!")
return nil, fmt.Errorf("unknown event type")
}

func handler(ctx context.Context, ev map[string]interface{}) error {
Expand All @@ -210,7 +210,7 @@ func handler(ctx context.Context, ev map[string]interface{}) error {

event, err := checkEventType(ev)
if err != nil {
level.Error(*pClient.log).Log("err", fmt.Errorf("invalid event: %s\n", ev))
level.Error(*pClient.log).Log("err", fmt.Errorf("invalid event: %s", ev))
return err
}

Expand Down
11 changes: 2 additions & 9 deletions tools/lambda-promtail/lambda-promtail/promtail.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,6 @@ type batch struct {
client Client
}

type batchIf interface {
add(ctx context.Context, e entry) error
encode() ([]byte, int, error)
createPushRequest() (*logproto.PushRequest, int)
flushBatch(ctx context.Context) error
}

func newBatch(ctx context.Context, pClient Client, entries ...entry) (*batch, error) {
b := &batch{
streams: map[string]*logproto.Stream{},
Expand Down Expand Up @@ -158,7 +151,7 @@ func (c *promtailClient) sendToPromtail(ctx context.Context, b *batch) error {
if status > 0 && status != 429 && status/100 != 5 {
break
}
level.Error(*c.log).Log("err", fmt.Errorf("error sending batch, will retry, status: %d error: %s\n", status, err))
level.Error(*c.log).Log("err", fmt.Errorf("error sending batch, will retry, status: %d error: %s", status, err))
backoff.Wait()

// Make sure it sends at least once before checking for retry.
Expand All @@ -168,7 +161,7 @@ func (c *promtailClient) sendToPromtail(ctx context.Context, b *batch) error {
}

if err != nil {
level.Error(*c.log).Log("err", fmt.Errorf("Failed to send logs! %s\n", err))
level.Error(*c.log).Log("err", fmt.Errorf("failed to send logs! %s", err))
return err
}

Expand Down
2 changes: 0 additions & 2 deletions tools/lambda-promtail/lambda-promtail/promtail_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"crypto/tls"
"net/http"
"net/url"
"time"

"github.com/go-kit/log"
Expand All @@ -25,7 +24,6 @@ type promtailClient struct {
type promtailClientConfig struct {
backoff *backoff.Config
http *httpClientConfig
url *url.URL
}

type httpClientConfig struct {
Expand Down
10 changes: 5 additions & 5 deletions tools/lambda-promtail/lambda-promtail/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,17 +187,17 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.

var lineCount int
for scanner.Scan() {
log_line := scanner.Text()
logLine := scanner.Text()
lineCount++
if lineCount <= parser.skipHeaderCount {
continue
}
if printLogLine {
fmt.Println(log_line)
fmt.Println(logLine)
}

timestamp := time.Now()
match := parser.timestampRegex.FindStringSubmatch(log_line)
match := parser.timestampRegex.FindStringSubmatch(logLine)
if len(match) > 0 {
if labels["lb_type"] == LB_NLB_TYPE {
// NLB logs don't have .SSSSSSZ suffix. RFC3339 requires a TZ specifier, use UTC
Expand All @@ -222,7 +222,7 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.
}

if err := b.add(ctx, entry{ls, logproto.Entry{
Line: log_line,
Line: logLine,
Timestamp: timestamp,
}}); err != nil {
return err
Expand Down Expand Up @@ -281,7 +281,7 @@ func processS3Event(ctx context.Context, ev *events.S3Event, pc Client, log *log
ExpectedBucketOwner: aws.String(labels["bucketOwner"]),
})
if err != nil {
return fmt.Errorf("Failed to get object %s from bucket %s on account %s\n, %s", labels["key"], labels["bucket"], labels["bucketOwner"], err)
return fmt.Errorf("failed to get object %s from bucket %s on account %s, %s", labels["key"], labels["bucket"], labels["bucketOwner"], err)
}
err = parseS3Log(ctx, batch, labels, obj.Body, log)
if err != nil {
Expand Down
59 changes: 24 additions & 35 deletions tools/lambda-promtail/testdata/kinesis-event.json
Original file line number Diff line number Diff line change
@@ -1,36 +1,25 @@
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "s1",
"sequenceNumber": "49568167373333333333333333333333333333333333333333333333",
"data": "SGVsbG8gV29ybGQ=",
"approximateArrivalTimestamp": 1480641523.477
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49568167373333333333333333333333333333333333333333333333",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/LambdaRole",
"awsRegion": "us-east-1",
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/simple-stream"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "s1",
"sequenceNumber": "49568167373333333334444444444444444444444444444444444444",
"data": "SGVsbG8gV29ybGQ=",
"approximateArrivalTimestamp": 1480841523.477
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49568167373333333334444444444444444444444444444444444444",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/LambdaRole",
"awsRegion": "us-east-1",
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/simple-stream"
}
]
}
"messageType": "DATA_MESSAGE",
"owner": "some_owner",
"logGroup": "test-logroup",
"logStream": "test-logstream",
"subscriptionFilters": ["test-subscription"],
"logEvents": [
{
"id": "98237509",
"timestamp": 1719922604969,
"message": "some_message"
},
{
"id": "20396236",
"timestamp": 1719922604969,
"message": "some_message"
},
{
"id": "23485670",
"timestamp": 1719922604969,
"message": "some_message"
}
]
}

0 comments on commit 6b011c0

Please sign in to comment.