Skip to content

Commit

Permalink
OTEL in task processing
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanos committed Jan 17, 2025
1 parent 72fa968 commit 74afed3
Show file tree
Hide file tree
Showing 25 changed files with 107 additions and 2 deletions.
4 changes: 2 additions & 2 deletions common/telemetry/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,9 @@ func (c *customServerStatsHandler) HandleRPC(ctx context.Context, stat stats.RPC
var k string
switch logTag.Key() {
case tag.WorkflowIDKey:
k = "temporalWorkflowID"
k = WorkflowIDKey
case tag.WorkflowRunIDKey:
k = "temporalRunID"
k = WorkflowRunIDKey
default:
continue
}
Expand Down
30 changes: 30 additions & 0 deletions common/telemetry/tags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package telemetry

const (
WorkflowIDKey = "temporalWorkflowID"
WorkflowRunIDKey = "temporalRunID"
)
2 changes: 2 additions & 0 deletions service/history/archival_queue_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func newQueueFactoryBase(params ArchivalQueueFactoryParams) QueueFactoryBase {
),
int64(params.Config.ArchivalQueueMaxReaderCount()),
),
Tracer: params.TracerProvider.Tracer("queue.archival"),
}
}

Expand Down Expand Up @@ -182,6 +183,7 @@ func (f *archivalQueueFactory) newScheduledQueue(shard shard.Context, executor q
shard.GetClusterMetadata(),
logger,
metricsHandler,
f.Tracer,
f.DLQWriter,
f.Config.TaskDLQEnabled,
f.Config.TaskDLQUnexpectedErrorAttempts,
Expand Down
2 changes: 2 additions & 0 deletions service/history/archival_queue_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace/noop"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
Expand Down Expand Up @@ -82,6 +83,7 @@ func TestArchivalQueueFactory(t *testing.T) {
TimeSource: clock.NewEventTimeSource(),
MetricsHandler: metricsHandler,
Logger: log.NewNoopLogger(),
TracerProvider: noop.NewTracerProvider(),
},
})
queue := queueFactory.CreateQueue(mockShard, nil)
Expand Down
2 changes: 2 additions & 0 deletions service/history/archival_queue_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace/noop"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
workflowpb "go.temporal.io/api/workflow/v1"
Expand Down Expand Up @@ -535,6 +536,7 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
mockMetadata,
logger,
metrics.NoopMetricsHandler,
noop.NewTracerProvider().Tracer(""),
)
err := executable.Execute()
if len(p.ExpectedErrorSubstrings) > 0 {
Expand Down
5 changes: 5 additions & 0 deletions service/history/memory_scheduled_queue_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package history

import (
"go.opentelemetry.io/otel/trace"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/log"
Expand All @@ -48,6 +49,7 @@ type (
Config *configs.Config
TimeSource clock.TimeSource
MetricsHandler metrics.Handler
TracerProvider trace.TracerProvider
Logger log.SnTaggedLogger

ExecutorWrapper queues.ExecutorWrapper `optional:"true"`
Expand All @@ -61,6 +63,7 @@ type (
clusterMetadata cluster.Metadata
timeSource clock.TimeSource
metricsHandler metrics.Handler
tracer trace.Tracer
logger log.SnTaggedLogger

executorWrapper queues.ExecutorWrapper
Expand Down Expand Up @@ -88,6 +91,7 @@ func NewMemoryScheduledQueueFactory(
clusterMetadata: params.ClusterMetadata,
timeSource: params.TimeSource,
metricsHandler: metricsHandler,
tracer: params.TracerProvider.Tracer("queue.memory"),
logger: logger,
executorWrapper: params.ExecutorWrapper,
}
Expand Down Expand Up @@ -129,6 +133,7 @@ func (f *memoryScheduledQueueFactory) CreateQueue(
f.clusterMetadata,
f.timeSource,
f.metricsHandler,
f.tracer,
f.logger,
)
}
1 change: 1 addition & 0 deletions service/history/outbound_queue_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ func (f *outboundQueueFactory) CreateQueue(
shardContext.GetClusterMetadata(),
logger,
metricsHandler,
f.TracerProvider.Tracer("queue.outbound"),
f.DLQWriter,
f.Config.TaskDLQEnabled,
f.Config.TaskDLQUnexpectedErrorAttempts,
Expand Down
3 changes: 3 additions & 0 deletions service/history/queue_factory_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package history
import (
"context"

"go.opentelemetry.io/otel/trace"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/dynamicconfig"
Expand Down Expand Up @@ -74,6 +75,7 @@ type (
Config *configs.Config
TimeSource clock.TimeSource
MetricsHandler metrics.Handler
TracerProvider trace.TracerProvider
Logger log.SnTaggedLogger
SchedulerRateLimiter queues.SchedulerRateLimiter
DLQWriter *queues.DLQWriter
Expand All @@ -86,6 +88,7 @@ type (
HostScheduler queues.Scheduler
HostPriorityAssigner queues.PriorityAssigner
HostReaderRateLimiter quotas.RequestRateLimiter
Tracer trace.Tracer
}

QueueFactoriesLifetimeHookParams struct {
Expand Down
22 changes: 22 additions & 0 deletions service/history/queues/executable.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
"sync"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/server/common"
Expand All @@ -50,6 +52,7 @@ import (
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
ctasks "go.temporal.io/server/common/tasks"
"go.temporal.io/server/common/telemetry"
"go.temporal.io/server/common/util"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/shard"
Expand Down Expand Up @@ -162,6 +165,7 @@ type (
clusterMetadata cluster.Metadata
logger log.Logger
metricsHandler metrics.Handler
tracer trace.Tracer
dlqWriter *DLQWriter

readerID int64
Expand Down Expand Up @@ -201,6 +205,7 @@ func NewExecutable(
clusterMetadata cluster.Metadata,
logger log.Logger,
metricsHandler metrics.Handler,
tracer trace.Tracer,
opts ...ExecutableOption,
) Executable {
params := ExecutableParams{
Expand Down Expand Up @@ -241,6 +246,7 @@ func NewExecutable(
},
),
metricsHandler: metricsHandler,
tracer: tracer,
dlqWriter: params.DLQWriter,
dlqEnabled: params.DLQEnabled,
maxUnexpectedErrorAttempts: params.MaxUnexpectedErrorAttempts,
Expand Down Expand Up @@ -277,6 +283,22 @@ func (e *executableImpl) Execute() (retErr error) {
)
e.Unlock()

ctx, span := e.tracer.Start(
ctx,
fmt.Sprintf("queue.Execute/%v", e.GetType().String()),
trace.WithSpanKind(trace.SpanKindConsumer),
trace.WithAttributes(
attribute.Key(telemetry.WorkflowIDKey).String(e.GetWorkflowID()),
attribute.Key(telemetry.WorkflowRunIDKey).String(e.GetRunID()),
attribute.Key("task-type").String(e.GetType().String()),
attribute.Key("task-id").Int64(e.GetTaskID())))
defer func() {
if retErr != nil {
span.RecordError(retErr)
}
span.End()
}()

defer func() {
if panicObj := recover(); panicObj != nil {
err, ok := panicObj.(error)
Expand Down
5 changes: 5 additions & 0 deletions service/history/queues/executable_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package queues

import (
"go.opentelemetry.io/otel/trace"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/dynamicconfig"
Expand Down Expand Up @@ -52,6 +53,7 @@ type (
clusterMetadata cluster.Metadata
logger log.Logger
metricsHandler metrics.Handler
tracer trace.Tracer
dlqWriter *DLQWriter
dlqEnabled dynamicconfig.BoolPropertyFn
attemptsBeforeSendingToDlq dynamicconfig.IntPropertyFn
Expand All @@ -74,6 +76,7 @@ func NewExecutableFactory(
clusterMetadata cluster.Metadata,
logger log.Logger,
metricsHandler metrics.Handler,
tracer trace.Tracer,
dlqWriter *DLQWriter,
dlqEnabled dynamicconfig.BoolPropertyFn,
attemptsBeforeSendingToDlq dynamicconfig.IntPropertyFn,
Expand All @@ -90,6 +93,7 @@ func NewExecutableFactory(
clusterMetadata: clusterMetadata,
logger: logger,
metricsHandler: metricsHandler.WithTags(defaultExecutableMetricsTags...),
tracer: tracer,
dlqWriter: dlqWriter,
dlqEnabled: dlqEnabled,
attemptsBeforeSendingToDlq: attemptsBeforeSendingToDlq,
Expand All @@ -111,6 +115,7 @@ func (f *executableFactoryImpl) NewExecutable(task tasks.Task, readerID int64) E
f.clusterMetadata,
f.logger,
f.metricsHandler,
f.tracer,
func(params *ExecutableParams) {
params.DLQEnabled = f.dlqEnabled
params.DLQWriter = f.dlqWriter
Expand Down
2 changes: 2 additions & 0 deletions service/history/queues/executable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.opentelemetry.io/otel/trace/noop"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/server/common/clock"
Expand Down Expand Up @@ -1079,6 +1080,7 @@ func (s *executableSuite) newTestExecutable(opts ...option) queues.Executable {
s.mockClusterMetadata,
log.NewTestLogger(),
s.metricsHandler,
noop.NewTracerProvider().Tracer(""),
func(params *queues.ExecutableParams) {
params.DLQEnabled = p.dlqEnabled
params.DLQWriter = p.dlqWriter
Expand Down
2 changes: 2 additions & 0 deletions service/history/queues/memory_scheduled_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.opentelemetry.io/otel/trace/noop"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
Expand Down Expand Up @@ -183,6 +184,7 @@ func (s *memoryScheduledQueueSuite) newSpeculativeWorkflowTaskTimeoutTestExecuta
nil,
nil,
nil,
noop.NewTracerProvider().Tracer(""),
),
wttt,
)
Expand Down
2 changes: 2 additions & 0 deletions service/history/queues/queue_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.opentelemetry.io/otel/trace/noop"
enumsspb "go.temporal.io/server/api/enums/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/cluster"
Expand Down Expand Up @@ -567,6 +568,7 @@ func (s *queueBaseSuite) newQueueBase(
mockShard.GetClusterMetadata(),
s.logger,
s.metricsHandler,
noop.NewTracerProvider().Tracer(""),
nil,
func() bool {
return false
Expand Down
2 changes: 2 additions & 0 deletions service/history/queues/queue_scheduled_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.opentelemetry.io/otel/trace/noop"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/log"
Expand Down Expand Up @@ -139,6 +140,7 @@ func (s *scheduledQueueSuite) SetupTest() {
s.mockShard.GetClusterMetadata(),
logger,
metrics.NoopMetricsHandler,
noop.NewTracerProvider().Tracer(""),
nil,
func() bool {
return false
Expand Down
2 changes: 2 additions & 0 deletions service/history/queues/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.opentelemetry.io/otel/trace/noop"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/collection"
"go.temporal.io/server/common/dynamicconfig"
Expand Down Expand Up @@ -88,6 +89,7 @@ func (s *readerSuite) SetupTest() {
nil,
nil,
metrics.NoopMetricsHandler,
noop.NewTracerProvider().Tracer(""),
)
})
s.monitor = newMonitor(tasks.CategoryTypeScheduled, clock.NewRealTimeSource(), &MonitorOptions{
Expand Down
2 changes: 2 additions & 0 deletions service/history/queues/slice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.opentelemetry.io/otel/trace/noop"
enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/collection"
Expand Down Expand Up @@ -84,6 +85,7 @@ func (s *sliceSuite) SetupTest() {
nil,
nil,
metrics.NoopMetricsHandler,
noop.NewTracerProvider().Tracer(""),
)
})
s.monitor = newMonitor(tasks.CategoryTypeScheduled, clock.NewRealTimeSource(), &MonitorOptions{
Expand Down
Loading

0 comments on commit 74afed3

Please sign in to comment.