Skip to content

Commit

Permalink
add actionQueue for calls because NewClient() is done
Browse files Browse the repository at this point in the history
Signed-off-by: Eliott Bouhana <eliott.bouhana@datadoghq.com>
  • Loading branch information
eliottness committed Jan 23, 2025
1 parent bce2138 commit 8a6e71c
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 4 deletions.
5 changes: 2 additions & 3 deletions internal/newtelemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,9 @@ type client struct {
Payload() transport.Payload
}

flushTicker *internal.Ticker
heartbeatTicker *internal.Ticker
flushTicker *internal.Ticker

// flushMapper is the transformer to use for the next flush on the gather payloads on this tick
// flushMapper is the transformer to use for the next flush on the gathered payloads on this tick
flushMapper mapper.Mapper
flushMapperMu sync.Mutex

Expand Down
33 changes: 32 additions & 1 deletion internal/newtelemetry/globalclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@ import (
"sync/atomic"

globalinternal "gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/newtelemetry/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/newtelemetry/types"
)

var (
globalClient atomic.Pointer[Client]

// actionQueue contains all actions done on the global client done before StartApp() with an actual client object is called
actionQueue = internal.NewRingQueue[func(Client)](16, 512)
)

// StartApp starts the telemetry client with the given client send the app-started telemetry and sets it as the global (*client).
Expand All @@ -22,8 +26,15 @@ func StartApp(client Client) {
return
}

client.appStart()
SwapClient(client)

actions := actionQueue.GetBuffer()
defer actionQueue.ReleaseBuffer(actions)
for _, action := range actions {
action(client)
}

client.appStart()
}

// SwapClient swaps the global client with the given client and Flush the old (*client).
Expand Down Expand Up @@ -129,6 +140,10 @@ func ProductStarted(product types.Namespace) {

if client := globalClient.Load(); client != nil && *client != nil {
(*client).ProductStarted(product)
} else {
actionQueue.Enqueue(func(client Client) {
client.ProductStarted(product)
})
}
}

Expand All @@ -140,6 +155,10 @@ func ProductStopped(product types.Namespace) {

if client := globalClient.Load(); client != nil && *client != nil {
(*client).ProductStopped(product)
} else {
actionQueue.Enqueue(func(client Client) {
client.ProductStopped(product)
})
}
}

Expand All @@ -151,6 +170,10 @@ func ProductStartError(product types.Namespace, err error) {

if client := globalClient.Load(); client != nil && *client != nil {
(*client).ProductStartError(product, err)
} else {
actionQueue.Enqueue(func(client Client) {
client.ProductStartError(product, err)
})
}
}

Expand All @@ -163,6 +186,10 @@ func AddAppConfig(key string, value any, origin types.Origin) {

if client := globalClient.Load(); client != nil && *client != nil {
(*client).AddAppConfig(key, value, origin)
} else {
actionQueue.Enqueue(func(client Client) {
client.AddAppConfig(key, value, origin)
})
}
}

Expand All @@ -175,5 +202,9 @@ func AddBulkAppConfig(kvs map[string]any, origin types.Origin) {

if client := globalClient.Load(); client != nil && *client != nil {
(*client).AddBulkAppConfig(kvs, origin)
} else {
actionQueue.Enqueue(func(client Client) {
client.AddBulkAppConfig(kvs, origin)
})
}
}

0 comments on commit 8a6e71c

Please sign in to comment.