-
Notifications
You must be signed in to change notification settings - Fork 39
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: added generic serverless agent (#893)
* feat:added generic serverless agent * review comments
- Loading branch information
1 parent
0c5664e
commit 95341cd
Showing
8 changed files
with
292 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
## Generic Serverless Agent | ||
|
||
To monitor Go applications deployed in a serverless environment like AWS Lambda, or on a server without a host agent, the process is similar to monitoring any other application. Simply instrument your application with the Instana Go Tracer SDK, deploy it to the appropriate environment, and ensure that the following two environment variables are set. | ||
|
||
> **INSTANA_ENDPOINT_URL** - The Instana backend endpoint that your serverless agents connect to. It depends on your region and is different from the host agent backend endpoint. | ||
> **INSTANA_AGENT_KEY** - Your Instana Agent key. The same agent key can be used for host agents and serverless monitoring. | ||
Please note that, in this generic serverless agent setup, only traces are available, metrics are not. However, for certain specific serverless services like AWS Lambda or Fargate, it is possible to correlate infrastructure and collect metrics as well. For more details, please refer to the documentation [here](https://www.ibm.com/docs/en/instana-observability/current?topic=technologies-monitoring-go#platforms). | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,172 @@ | ||
// (c) Copyright IBM Corp. 2024 | ||
|
||
package instana | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"io" | ||
"net/http" | ||
"os" | ||
"sync" | ||
"time" | ||
|
||
"github.com/google/uuid" | ||
"github.com/instana/go-sensor/acceptor" | ||
"github.com/instana/go-sensor/autoprofile" | ||
) | ||
|
||
const ( | ||
flushPeriodForGenericInSec = 2 | ||
) | ||
|
||
type genericServerlessAgent struct { | ||
Endpoint string | ||
Key string | ||
PluginName string | ||
PID int | ||
|
||
snapshot serverlessSnapshot | ||
|
||
mu sync.Mutex | ||
spanQueue []Span | ||
|
||
client *http.Client | ||
logger LeveledLogger | ||
} | ||
|
||
func newGenericServerlessAgent(acceptorEndpoint, agentKey string, client *http.Client, logger LeveledLogger) *genericServerlessAgent { | ||
if logger == nil { | ||
logger = defaultLogger | ||
} | ||
|
||
if client == nil { | ||
client = http.DefaultClient | ||
// You can change this timeout by setting the INSTANA_TIMEOUT environment variable. | ||
client.Timeout = 2 * time.Second | ||
} | ||
|
||
logger.Debug("initializing generic serverless agent") | ||
|
||
// Creating a unique serverless host ID. | ||
uniqHostId := "Generic_Serverless_Agent" + uuid.New().String() | ||
|
||
agent := &genericServerlessAgent{ | ||
Endpoint: acceptorEndpoint, | ||
Key: agentKey, | ||
PID: os.Getpid(), | ||
client: client, | ||
logger: logger, | ||
snapshot: serverlessSnapshot{ | ||
Host: uniqHostId, | ||
EntityID: uniqHostId, | ||
}, | ||
} | ||
|
||
go func() { | ||
t := time.NewTicker(flushPeriodForGenericInSec * time.Second) | ||
defer t.Stop() | ||
|
||
for range t.C { | ||
if err := agent.Flush(context.Background()); err != nil { | ||
agent.logger.Error("failed to post collected data: ", err) | ||
} | ||
} | ||
}() | ||
|
||
return agent | ||
} | ||
|
||
func (a *genericServerlessAgent) Ready() bool { return true } | ||
|
||
func (a *genericServerlessAgent) SendMetrics(acceptor.Metrics) error { return nil } | ||
|
||
func (a *genericServerlessAgent) SendEvent(*EventData) error { return nil } | ||
|
||
func (a *genericServerlessAgent) SendSpans(spans []Span) error { | ||
a.enqueueSpans(spans) | ||
return nil | ||
} | ||
|
||
func (a *genericServerlessAgent) SendProfiles([]autoprofile.Profile) error { return nil } | ||
|
||
func (a *genericServerlessAgent) Flush(ctx context.Context) error { | ||
from := newServerlessAgentFromS(a.snapshot.EntityID, "generic_serverless") | ||
|
||
payload := struct { | ||
Spans []Span `json:"spans,omitempty"` | ||
}{} | ||
|
||
a.mu.Lock() | ||
payload.Spans = make([]Span, len(a.spanQueue)) | ||
copy(payload.Spans, a.spanQueue) | ||
a.spanQueue = a.spanQueue[:0] | ||
a.mu.Unlock() | ||
|
||
for i := range payload.Spans { | ||
payload.Spans[i].From = from | ||
} | ||
|
||
buf := bytes.NewBuffer(nil) | ||
if err := json.NewEncoder(buf).Encode(payload); err != nil { | ||
return fmt.Errorf("failed to marshal traces payload: %s", err) | ||
} | ||
|
||
payloadSize := buf.Len() | ||
if payloadSize > maxContentLength { | ||
a.logger.Warn(fmt.Sprintf("failed to send the spans. Payload size: %d exceeded max size: %d", payloadSize, maxContentLength)) | ||
return payloadTooLargeErr | ||
} | ||
|
||
req, err := http.NewRequest(http.MethodPost, a.Endpoint+"/bundle", buf) | ||
if err != nil { | ||
a.enqueueSpans(payload.Spans) | ||
return fmt.Errorf("failed to prepare send traces request: %s", err) | ||
} | ||
|
||
req.Header.Set("Content-Type", "application/json") | ||
|
||
if err := a.sendRequest(req.WithContext(ctx)); err != nil { | ||
a.enqueueSpans(payload.Spans) | ||
return fmt.Errorf("failed to send traces, will retry later: %dsec. Error details: %s", | ||
flushPeriodForGenericInSec, err.Error()) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (a *genericServerlessAgent) enqueueSpans(spans []Span) { | ||
a.mu.Lock() | ||
defer a.mu.Unlock() | ||
|
||
a.spanQueue = append(a.spanQueue, spans...) | ||
} | ||
|
||
func (a *genericServerlessAgent) sendRequest(req *http.Request) error { | ||
req.Header.Set("X-Instana-Host", a.snapshot.Host) | ||
req.Header.Set("X-Instana-Key", a.Key) | ||
|
||
resp, err := a.client.Do(req) | ||
if err != nil { | ||
return fmt.Errorf("failed to send request to the serverless agent: %s", err) | ||
} | ||
|
||
defer resp.Body.Close() | ||
|
||
if resp.StatusCode >= http.StatusBadRequest { | ||
respBody, err := io.ReadAll(resp.Body) | ||
if err != nil { | ||
a.logger.Debug("failed to read serverless agent response: ", err.Error()) | ||
return err | ||
} | ||
|
||
a.logger.Info("serverless agent has responded with ", resp.Status, ": ", string(respBody)) | ||
return err | ||
} | ||
|
||
io.CopyN(io.Discard, resp.Body, 1<<20) | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
// (c) Copyright IBM Corp. 2024 | ||
|
||
//go:build generic_serverless && integration | ||
// +build generic_serverless,integration | ||
|
||
package instana_test | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"log" | ||
"os" | ||
"testing" | ||
"time" | ||
|
||
instana "github.com/instana/go-sensor" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
var agent *serverlessAgent | ||
|
||
func TestMain(m *testing.M) { | ||
teardownInstanaEnv := setupInstanaEnv() | ||
defer teardownInstanaEnv() | ||
|
||
var err error | ||
agent, err = setupServerlessAgent() | ||
if err != nil { | ||
log.Fatalf("failed to initialize serverless agent: %s", err) | ||
} | ||
|
||
os.Exit(m.Run()) | ||
} | ||
|
||
func TestLocalServerlessAgent_SendSpans(t *testing.T) { | ||
defer agent.Reset() | ||
|
||
tracer := instana.NewTracer() | ||
sensor := instana.NewSensorWithTracer(tracer) | ||
defer instana.ShutdownSensor() | ||
|
||
sp := sensor.Tracer().StartSpan("generic_serverless") | ||
sp.Finish() | ||
|
||
ctx, cancel := context.WithTimeout(context.Background(), time.Second) | ||
defer cancel() | ||
|
||
require.NoError(t, tracer.Flush(ctx)) | ||
require.Len(t, agent.Bundles, 1) | ||
|
||
var spans []map[string]json.RawMessage | ||
for _, bundle := range agent.Bundles { | ||
var payload struct { | ||
Spans []map[string]json.RawMessage `json:"spans"` | ||
} | ||
|
||
require.NoError(t, json.Unmarshal(bundle.Body, &payload), "%s", string(bundle.Body)) | ||
spans = append(spans, payload.Spans...) | ||
} | ||
|
||
require.Len(t, spans, 1) | ||
} | ||
|
||
func TestLocalServerlessAgent_SendSpans_Error(t *testing.T) { | ||
defer agent.Reset() | ||
|
||
tracer := instana.NewTracer() | ||
sensor := instana.NewSensorWithTracer(tracer) | ||
defer instana.ShutdownSensor() | ||
|
||
sp := sensor.Tracer().StartSpan("http") | ||
sp.SetTag("returnError", "true") | ||
sp.Finish() | ||
|
||
ctx, cancel := context.WithTimeout(context.Background(), time.Minute) | ||
defer cancel() | ||
|
||
require.NoError(t, tracer.Flush(ctx)) | ||
require.Len(t, agent.Bundles, 0) | ||
} | ||
|
||
func setupInstanaEnv() func() { | ||
var teardownFuncs []func() | ||
|
||
teardownFuncs = append(teardownFuncs, restoreEnvVarFunc("INSTANA_AGENT_KEY")) | ||
os.Setenv("INSTANA_AGENT_KEY", "testkey1") | ||
|
||
teardownFuncs = append(teardownFuncs, restoreEnvVarFunc("INSTANA_ZONE")) | ||
os.Setenv("INSTANA_ZONE", "testzone") | ||
|
||
teardownFuncs = append(teardownFuncs, restoreEnvVarFunc("INSTANA_TAGS")) | ||
os.Setenv("INSTANA_TAGS", "key1=value1,key2") | ||
|
||
teardownFuncs = append(teardownFuncs, restoreEnvVarFunc("INSTANA_SECRETS")) | ||
os.Setenv("INSTANA_SECRETS", "contains-ignore-case:key,password,secret,classified") | ||
|
||
teardownFuncs = append(teardownFuncs, restoreEnvVarFunc("CLASSIFIED_DATA")) | ||
os.Setenv("CLASSIFIED_DATA", "classified") | ||
|
||
return func() { | ||
for _, f := range teardownFuncs { | ||
f() | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters