From 83da1dec4ff6ee4e7951af8b6078a4c38bbeb79c Mon Sep 17 00:00:00 2001 From: Islam Shehata Date: Wed, 26 Jun 2024 13:19:35 +0300 Subject: [PATCH 1/7] test AWS telemetry API --- Makefile | 1 + extension/extension.go | 4 +- main.go | 12 ++-- server/server.go | 27 +-------- telemetryapi/telemetry.go | 120 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 130 insertions(+), 34 deletions(-) create mode 100644 telemetryapi/telemetry.go diff --git a/Makefile b/Makefile index e9de60b..83a6701 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,5 @@ GOOS=linux +# GOARCH=arm64 test: GOOS=${GOOS} GOARCH=${GOARCH} go test ./... diff --git a/extension/extension.go b/extension/extension.go index d9eb9d7..e392abc 100644 --- a/extension/extension.go +++ b/extension/extension.go @@ -41,9 +41,9 @@ const ( extensionIdentifierHeader = "Lambda-Extension-Identifier" ) -func New(LogsAPI string) *Client { +func New(telemetryAPI string) *Client { return &Client{ - baseURL: fmt.Sprintf("http://%s/2020-01-01/extension", LogsAPI), + baseURL: fmt.Sprintf("http://%s/2020-01-01/extension", telemetryAPI), httpClient: &http.Client{}, } } diff --git a/main.go b/main.go index b437f65..8e038dc 100644 --- a/main.go +++ b/main.go @@ -14,8 +14,8 @@ import ( "github.com/axiomhq/axiom-lambda-extension/extension" "github.com/axiomhq/axiom-lambda-extension/flusher" - "github.com/axiomhq/axiom-lambda-extension/logsapi" "github.com/axiomhq/axiom-lambda-extension/server" + "github.com/axiomhq/axiom-lambda-extension/telemetryapi" ) var ( @@ -92,22 +92,22 @@ func Run() error { } // LOGS API SUBSCRIPTION - logsClient := logsapi.New(runtimeAPI) + telemetryClient := telemetryapi.New(runtimeAPI) - destination := logsapi.Destination{ + destination := telemetryapi.Destination{ Protocol: "HTTP", - URI: logsapi.URI(fmt.Sprintf("http://sandbox.localdomain:%s/", logsPort)), + URI: telemetryapi.URI(fmt.Sprintf("http://sandbox.localdomain:%s/", logsPort)), HttpMethod: "POST", Encoding: "JSON", } - bufferingCfg := logsapi.BufferingCfg{ + bufferingCfg := telemetryapi.BufferingCfg{ MaxItems: uint32(defaultMaxItems), MaxBytes: uint32(defaultMaxBytes), TimeoutMS: uint32(defaultTimeoutMS), } - _, err = logsClient.Subscribe(ctx, []string{"function", "platform"}, bufferingCfg, destination, extensionClient.ExtensionID) + _, err = telemetryClient.Subscribe(ctx, []string{"function", "platform"}, bufferingCfg, destination, extensionClient.ExtensionID) if err != nil { return err } diff --git a/server/server.go b/server/server.go index f3a9ddf..4b4605b 100644 --- a/server/server.go +++ b/server/server.go @@ -6,7 +6,6 @@ import ( "io" "net/http" "regexp" - "strings" "os" "strconv" @@ -120,29 +119,5 @@ func httpHandler(ax *flusher.Axiom, runtimeDone chan struct{}) http.HandlerFunc // it detects if the record is a json string or a text log line that confirms to AWS log line formatting. func extractEventMessage(e map[string]any) { e["message"] = e["record"] - if recordStr, ok := e["record"].(string); ok && len(recordStr) > 0 { - recordStr = strings.Trim(recordStr, "\n") - // parse the record - // first check if the record is a json object, if not parse it as a text log line - if recordStr[0] == '{' && recordStr[len(recordStr)-1] == '}' { - var record map[string]any - err := json.Unmarshal([]byte(recordStr), &record) - if err != nil { - logger.Error("Error unmarshalling record:", zap.Error(err)) - // do not return, we want to continue processing the event - } else { - if level, ok := record["level"].(string); ok { - record["level"] = strings.ToLower(level) - } - e["level"] = record["level"] - e["record"] = record - } - } else { - matches := logLineRgx.FindStringSubmatch(recordStr) - if len(matches) == 5 { - e["level"] = strings.ToLower(matches[3]) - e["record"] = map[string]any{"requestId": matches[2], "message": matches[4], "timestamp": matches[1], "level": e["level"]} - } - } - } + delete(e, "record") } diff --git a/telemetryapi/telemetry.go b/telemetryapi/telemetry.go new file mode 100644 index 0000000..0fa7bff --- /dev/null +++ b/telemetryapi/telemetry.go @@ -0,0 +1,120 @@ +package telemetryapi + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" +) + +type Client struct { + baseURL string + httpClient *http.Client +} + +type BufferingCfg struct { + MaxItems uint32 `json:"maxItems"` + MaxBytes uint32 `json:"maxBytes"` + TimeoutMS uint32 `json:"timeoutMs"` +} + +// URI is used to set the endpoint where the logs will be sent to +type URI string + +// HttpMethod represents the HTTP method used to receive logs from Logs API +type HttpMethod string + +const ( + //HttpPost is to receive logs through POST. + HttpPost HttpMethod = "POST" + //HttpPUT is to receive logs through PUT. + HttpPut HttpMethod = "PUT" +) + +// HttpProtocol is used to specify the protocol when subscribing to Logs API for HTTP +type HttpProtocol string + +const ( + HttpProto HttpProtocol = "HTTP" +) + +// HttpEncoding denotes what the content is encoded in +type HttpEncoding string + +const ( + JSON HttpEncoding = "JSON" +) + +type Destination struct { + Protocol HttpProtocol `json:"protocol"` + URI URI `json:"URI"` + HttpMethod HttpMethod `json:"method"` + Encoding HttpEncoding `json:"encoding"` +} + +type SubscribeRequest struct { + SchemaVersion string `json:"schemaVersion"` + EventTypes []string `json:"types"` + BufferingCfg BufferingCfg `json:"buffering"` + Destination Destination `json:"destination"` +} + +// SubscribeResponse is the response body that is received from Logs API on subscribe +type SubscribeResponse struct { + body string +} + +const ( + lambdaAgentIdentifierHeaderKey = "Lambda-Extension-Identifier" + SchemaVersion20221213 = "2022-12-13" + SchemaVersionLatest = SchemaVersion20221213 +) + +func New(runtimeAPI string) *Client { + return &Client{ + baseURL: fmt.Sprintf("http://%s/2022-07-01", runtimeAPI), + httpClient: &http.Client{}, + } +} + +func (lc *Client) Subscribe(ctx context.Context, types []string, bufferingCfg BufferingCfg, destination Destination, extensionID string) (*SubscribeResponse, error) { + subscribeEndpoint := lc.baseURL + "/telemetry" + + subReq, err := json.Marshal( + &SubscribeRequest{ + SchemaVersion: SchemaVersionLatest, + EventTypes: types, + BufferingCfg: bufferingCfg, + Destination: destination, + }, + ) + if err != nil { + return nil, fmt.Errorf("marshaling subscribeRequest failed") + } + + httpReq, err := http.NewRequestWithContext(ctx, "PUT", subscribeEndpoint, bytes.NewBuffer(subReq)) + if err != nil { + return nil, err + } + + httpReq.Header.Set(lambdaAgentIdentifierHeaderKey, extensionID) + httpRes, err := lc.httpClient.Do(httpReq) + if err != nil { + return nil, err + } + + defer httpRes.Body.Close() + body, err := io.ReadAll(httpRes.Body) + if err != nil { + return nil, err + } + + if httpRes.StatusCode != 200 { + return nil, fmt.Errorf("subscription request failed with status %s", httpRes.Status) + } + return &SubscribeResponse{ + body: string(body), + }, nil +} From 7fb3eed8a99dc4228b94a65a129483dbd1888f3d Mon Sep 17 00:00:00 2001 From: Islam Shehata Date: Mon, 1 Jul 2024 14:15:39 +0300 Subject: [PATCH 2/7] remove un-used regex --- server/server.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/server/server.go b/server/server.go index 4b4605b..609f31b 100644 --- a/server/server.go +++ b/server/server.go @@ -5,7 +5,6 @@ import ( "fmt" "io" "net/http" - "regexp" "os" "strconv" @@ -37,8 +36,6 @@ var ( axiomMetaInfo = map[string]string{} ) -var logLineRgx, _ = regexp.Compile(`^([0-9.:TZ-]{20,})\s+([0-9a-f-]{36})\s+(ERROR|INFO|WARN|DEBUG|TRACE)\s+(?s:(.*))`) - func init() { logger, _ = zap.NewProduction() From e9c7e6498e6ad028c819d8e8866d61c51de4102b Mon Sep 17 00:00:00 2001 From: Islam Shehata Date: Mon, 1 Jul 2024 14:36:24 +0300 Subject: [PATCH 3/7] remove message extraction --- server/server.go | 11 ++--- server/server_test.go | 98 ------------------------------------------- 2 files changed, 3 insertions(+), 106 deletions(-) delete mode 100644 server/server_test.go diff --git a/server/server.go b/server/server.go index 609f31b..cdc43af 100644 --- a/server/server.go +++ b/server/server.go @@ -87,7 +87,9 @@ func httpHandler(ax *flusher.Axiom, runtimeDone chan struct{}) http.HandlerFunc e["_time"], e["time"] = e["time"], nil if e["type"] == "function" { - extractEventMessage(e) + // extract the message from the record field and puts it in the message field + e["message"] = e["record"] + delete(e, "record") } // decide if the handler should notify the extension that the runtime is done @@ -111,10 +113,3 @@ func httpHandler(ax *flusher.Axiom, runtimeDone chan struct{}) http.HandlerFunc } } } - -// extractEventMessage extracts the message from the record field and puts it in the message field -// it detects if the record is a json string or a text log line that confirms to AWS log line formatting. -func extractEventMessage(e map[string]any) { - e["message"] = e["record"] - delete(e, "record") -} diff --git a/server/server_test.go b/server/server_test.go deleted file mode 100644 index 4211e5d..0000000 --- a/server/server_test.go +++ /dev/null @@ -1,98 +0,0 @@ -package server - -import ( - "testing" -) - -func TestMessageExtraction(t *testing.T) { - testCases := []struct { - name string - input string - expected map[string]any - }{ - { - name: "error messages on multiple lines", - input: "2024-01-16T08:53:51.919Z 4b995efa-75f8-4fdc-92af-0882c79f47a1 ERROR testing sending an error\nand this is a new line inside the error \n and a new line \n bye", - expected: map[string]any{ - "level": "error", - "message": "SAME_AS_INPUT_NO_NEED_TO_DUPLICATE_INPUT_HERE", - "record": map[string]any{"requestId": "4b995efa-75f8-4fdc-92af-0882c79f47a1", "message": "testing sending an error\nand this is a new line inside the error \n and a new line \n bye", "timestamp": "2024-01-16T08:53:51.919Z", "level": "error"}, - }, - }, - { - name: "info messages", - input: "2024-01-16T08:53:51.919Z 4b995efa-75f8-4fdc-92af-0882c79f47a2 INFO Hello, world!", - expected: map[string]any{ - "level": "info", - "message": "SAME_AS_INPUT_NO_NEED_TO_DUPLICATE_INPUT_HERE", - "record": map[string]any{"requestId": "4b995efa-75f8-4fdc-92af-0882c79f47a2", "message": "Hello, world!", "timestamp": "2024-01-16T08:53:51.919Z", "level": "info"}, - }, - }, - { - name: "warn messages", - input: "2024-01-16T08:53:51.919Z 4b995efa-75f8-4fdc-92af-0882c79f47a3 WARN head my warning", - expected: map[string]any{ - "level": "warn", - "message": "SAME_AS_INPUT_NO_NEED_TO_DUPLICATE_INPUT_HERE", - "record": map[string]any{"requestId": "4b995efa-75f8-4fdc-92af-0882c79f47a3", "message": "head my warning", "timestamp": "2024-01-16T08:53:51.919Z", "level": "warn"}, - }, - }, - { - name: "trace messages", - input: "2024-01-16T08:53:51.919Z 4b995efa-75f8-4fdc-92af-0882c79f47a4 TRACE this is a trace \n with information on a new line.", - expected: map[string]any{ - "level": "trace", - "message": "SAME_AS_INPUT_NO_NEED_TO_DUPLICATE_INPUT_HERE", - "record": map[string]any{"requestId": "4b995efa-75f8-4fdc-92af-0882c79f47a4", "message": "this is a trace \n with information on a new line.", "timestamp": "2024-01-16T08:53:51.919Z", "level": "trace"}, - }, - }, - { - name: "debug messages", - input: "2024-01-16T08:53:51.919Z 4b995efa-75f8-4fdc-92af-0882c79f47a5 DEBUG Debugging is fun!", - expected: map[string]any{ - "level": "debug", - "message": "SAME_AS_INPUT_NO_NEED_TO_DUPLICATE_INPUT_HERE", - "record": map[string]any{"requestId": "4b995efa-75f8-4fdc-92af-0882c79f47a5", "message": "Debugging is fun!", "timestamp": "2024-01-16T08:53:51.919Z", "level": "debug"}, - }, - }, - { - name: "testing json messages", - input: `{"timestamp":"2024-01-08T16:48:45.316Z","level":"INFO","requestId":"de126cf0-6124-426c-818a-174983fbfc4b","message":"foo != bar"}`, - expected: map[string]any{ - "level": "info", - "message": "SAME_AS_INPUT_NO_NEED_TO_DUPLICATE_INPUT_HERE", - "record": map[string]any{"requestId": "de126cf0-6124-426c-818a-174983fbfc4b", "message": "foo != bar", "timestamp": "2024-01-08T16:48:45.316Z", "level": "info"}, - }, - }, - } - - for _, testCase := range testCases { - t.Run(testCase.name, func(t *testing.T) { - e := make(map[string]any) - e["record"] = testCase.input - extractEventMessage(e) - if e["level"] != testCase.expected["level"] { - t.Errorf("Expected level to be %s, got %s", testCase.expected["level"], e["level"]) - } - if e["message"] != testCase.input { // the message field should contain the original input - t.Errorf("Expected message to be %s, got %s", testCase.input, e["message"]) - } - - expectedRecord := testCase.expected["record"].(map[string]any) - outputRecord := e["record"].(map[string]any) - - if outputRecord["timestamp"] != expectedRecord["timestamp"] { - t.Errorf("Expected timestamp to be %s, got %s", testCase.expected["timestamp"], e["timestamp"]) - } - if outputRecord["level"] != expectedRecord["level"] { - t.Errorf("Expected record.level to be %s, got %s", expectedRecord["level"], outputRecord["level"]) - } - if outputRecord["requestId"] != expectedRecord["requestId"] { - t.Errorf("Expected record.requestId to be %s, got %s", expectedRecord["requestId"], outputRecord["requestId"]) - } - if outputRecord["message"] != expectedRecord["message"] { - t.Errorf("Expected record.message to be %s, got %s", expectedRecord["message"], outputRecord["message"]) - } - }) - } -} From 238dc17e6caa57d7ea94501149f463be06df9090 Mon Sep 17 00:00:00 2001 From: Islam Shehata Date: Tue, 2 Jul 2024 15:22:44 +0300 Subject: [PATCH 4/7] remove logsapi implementation --- logsapi/logs.go | 120 ------------------------------------------------ 1 file changed, 120 deletions(-) delete mode 100644 logsapi/logs.go diff --git a/logsapi/logs.go b/logsapi/logs.go deleted file mode 100644 index 8fb380b..0000000 --- a/logsapi/logs.go +++ /dev/null @@ -1,120 +0,0 @@ -package logsapi - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "io" - "net/http" -) - -type Client struct { - baseURL string - httpClient *http.Client -} - -type BufferingCfg struct { - MaxItems uint32 `json:"maxItems"` - MaxBytes uint32 `json:"maxBytes"` - TimeoutMS uint32 `json:"timeoutMs"` -} - -// URI is used to set the endpoint where the logs will be sent to -type URI string - -// HttpMethod represents the HTTP method used to receive logs from Logs API -type HttpMethod string - -const ( - //HttpPost is to receive logs through POST. - HttpPost HttpMethod = "POST" - //HttpPUT is to receive logs through PUT. - HttpPut HttpMethod = "PUT" -) - -// HttpProtocol is used to specify the protocol when subscribing to Logs API for HTTP -type HttpProtocol string - -const ( - HttpProto HttpProtocol = "HTTP" -) - -// HttpEncoding denotes what the content is encoded in -type HttpEncoding string - -const ( - JSON HttpEncoding = "JSON" -) - -type Destination struct { - Protocol HttpProtocol `json:"protocol"` - URI URI `json:"URI"` - HttpMethod HttpMethod `json:"method"` - Encoding HttpEncoding `json:"encoding"` -} - -type SubscribeRequest struct { - SchemaVersion string `json:"schemaVersion"` - EventTypes []string `json:"types"` - BufferingCfg BufferingCfg `json:"buffering"` - Destination Destination `json:"destination"` -} - -// SubscribeResponse is the response body that is received from Logs API on subscribe -type SubscribeResponse struct { - body string -} - -const ( - lambdaAgentIdentifierHeaderKey = "Lambda-Extension-Identifier" - SchemaVersion20210318 = "2021-03-18" - SchemaVersionLatest = SchemaVersion20210318 -) - -func New(LogsAPI string) *Client { - return &Client{ - baseURL: fmt.Sprintf("http://%s/2020-08-15", LogsAPI), - httpClient: &http.Client{}, - } -} - -func (lc *Client) Subscribe(ctx context.Context, types []string, bufferingCfg BufferingCfg, destination Destination, extensionID string) (*SubscribeResponse, error) { - subscribeEndpoint := lc.baseURL + "/logs" - - subReq, err := json.Marshal( - &SubscribeRequest{ - SchemaVersion: SchemaVersionLatest, - EventTypes: types, - BufferingCfg: bufferingCfg, - Destination: destination, - }, - ) - if err != nil { - return nil, fmt.Errorf("marshaling subscribeRequest failed") - } - - httpReq, err := http.NewRequestWithContext(ctx, "PUT", subscribeEndpoint, bytes.NewBuffer(subReq)) - if err != nil { - return nil, err - } - - httpReq.Header.Set(lambdaAgentIdentifierHeaderKey, extensionID) - httpRes, err := lc.httpClient.Do(httpReq) - if err != nil { - return nil, err - } - - defer httpRes.Body.Close() - body, err := io.ReadAll(httpRes.Body) - if err != nil { - return nil, err - } - - if httpRes.StatusCode != 200 { - return nil, fmt.Errorf("subscription request failed with status %s", httpRes.Status) - } - return &SubscribeResponse{ - body: string(body), - }, nil -} From 80b982195e4ad92f7ca402d94f1ef86e54769aba Mon Sep 17 00:00:00 2001 From: Islam Shehata Date: Mon, 5 Aug 2024 17:01:51 +0300 Subject: [PATCH 5/7] ensure setting record.requestId for function events --- Makefile | 3 +-- server/server.go | 20 +++++++++++++++++--- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/Makefile b/Makefile index 83a6701..e55ecfa 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,4 @@ GOOS=linux -# GOARCH=arm64 test: GOOS=${GOOS} GOARCH=${GOARCH} go test ./... @@ -12,7 +11,7 @@ package: build cd bin && zip -r extension.zip extensions publish: package - aws lambda publish-layer-version --layer-name axiom-development-lambda-extension-go --region eu-west-1 --zip-file "fileb://bin/extension.zip" --compatible-architectures ${GOARCH} --description 'axiom lambda extension to push lambda logs to https://axiom.co' + aws lambda publish-layer-version --layer-name axiom-development-lambda-extension-go --region eu-west-1 --zip-file "fileb://bin/extension.zip" --compatible-architectures ${GOARCH} --description 'axiom lambda extension to push lambda logs to https://axiom.co' arch: echo ${GOARCH} diff --git a/server/server.go b/server/server.go index cdc43af..baef861 100644 --- a/server/server.go +++ b/server/server.go @@ -78,8 +78,15 @@ func httpHandler(ax *flusher.Axiom, runtimeDone chan struct{}) http.HandlerFunc } notifyRuntimeDone := false + requestId := "" for _, e := range events { + // capture requestId on each start event + if rec, ok := e["record"]; ok && e["type"] == "platform.start" { + record := rec.(map[string]any) + requestId = record["requestId"].(string) + } + // attach the lambda information to the event e["lambda"] = lambdaMetaInfo e["axiom"] = axiomMetaInfo @@ -87,9 +94,16 @@ func httpHandler(ax *flusher.Axiom, runtimeDone chan struct{}) http.HandlerFunc e["_time"], e["time"] = e["time"], nil if e["type"] == "function" { - // extract the message from the record field and puts it in the message field - e["message"] = e["record"] - delete(e, "record") + // extract the message from the record field + if rec, ok := e["record"].(map[string]any); ok { + e["message"] = rec["message"] + } else { + e["message"] = e["record"] + e["record"] = map[string]string{ + "requestId": requestId, + } + } + } // decide if the handler should notify the extension that the runtime is done From f266652ef6ad30b13fa257ab512a2adf35bfd375 Mon Sep 17 00:00:00 2001 From: Islam Shehata Date: Mon, 5 Aug 2024 19:11:45 +0300 Subject: [PATCH 6/7] lint: fixed lint errors --- server/server.go | 1 - 1 file changed, 1 deletion(-) diff --git a/server/server.go b/server/server.go index baef861..d52be81 100644 --- a/server/server.go +++ b/server/server.go @@ -103,7 +103,6 @@ func httpHandler(ax *flusher.Axiom, runtimeDone chan struct{}) http.HandlerFunc "requestId": requestId, } } - } // decide if the handler should notify the extension that the runtime is done From 226ff0909eafef63258cb46d1bc707cccb5822a9 Mon Sep 17 00:00:00 2001 From: Islam Shehata Date: Tue, 6 Aug 2024 12:51:34 +0300 Subject: [PATCH 7/7] simplify capturing requestId and message from record --- server/server.go | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/server/server.go b/server/server.go index d52be81..499e4ca 100644 --- a/server/server.go +++ b/server/server.go @@ -78,13 +78,22 @@ func httpHandler(ax *flusher.Axiom, runtimeDone chan struct{}) http.HandlerFunc } notifyRuntimeDone := false - requestId := "" + requestID := "" for _, e := range events { - // capture requestId on each start event - if rec, ok := e["record"]; ok && e["type"] == "platform.start" { - record := rec.(map[string]any) - requestId = record["requestId"].(string) + e["message"] = "" + // if reocrd key exists, extract the requestId and message from it + if rec, ok := e["record"]; ok { + if record, ok := rec.(map[string]any); ok { + // capture requestId and set it if it exists + if reqID, ok := record["requestId"]; ok { + requestID = reqID.(string) + } + if e["type"] == "function" { + // set message + e["message"] = record["message"].(string) + } + } } // attach the lambda information to the event @@ -93,15 +102,12 @@ func httpHandler(ax *flusher.Axiom, runtimeDone chan struct{}) http.HandlerFunc // replace the time field with axiom's _time e["_time"], e["time"] = e["time"], nil - if e["type"] == "function" { - // extract the message from the record field - if rec, ok := e["record"].(map[string]any); ok { - e["message"] = rec["message"] - } else { - e["message"] = e["record"] - e["record"] = map[string]string{ - "requestId": requestId, - } + // If we didn't find a message in record field, move the record to message + // and assign requestId + if e["type"] == "function" && e["message"] == "" { + e["message"] = e["record"] + e["record"] = map[string]string{ + "requestId": requestID, } }