Skip to content

Commit

Permalink
[exporterhelper] Remove redundant request interfaces (open-telemetry#…
Browse files Browse the repository at this point in the history
…8867)

Pass request+context through the senders pipeline similar to what we do
in the collector's pipeline. Use a helper QueueRequest struct for
passing requests through queues

This changes also moves the experimental Request interface to a separate
package.
  • Loading branch information
dmitryax authored Nov 14, 2023
1 parent ffe1a29 commit a6c0bd4
Show file tree
Hide file tree
Showing 23 changed files with 315 additions and 343 deletions.
30 changes: 30 additions & 0 deletions .chloggen/update-eperimental-request-api.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporter/exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: The experimental Request API is updated and moved to a separate package.

# One or more tracking issues or pull requests related to the change
issues: [7874]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
- The experimental `Request` interface is moved from exporter/exporterhelper to exporter/exporterhelper/request
package and updated to include ItemsCount() method.
- `RequestItemsCounter` is removed.
- Added the following APIs to exporter/exporterhelper/request package:
- request.ErrorHandler: an optional interface for handling errors that occur during request processing.
- request.Marshaler: a function that can marshal a Request into bytes.
- request.Unmarshaler: a function that can unmarshal bytes into a Request
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
49 changes: 12 additions & 37 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
)

// requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs).
type requestSender interface {
component.Component
send(req internal.Request) error
send(context.Context, Request) error
setNextSender(nextSender requestSender)
}

Expand All @@ -30,8 +29,8 @@ type baseRequestSender struct {

var _ requestSender = (*baseRequestSender)(nil)

func (b *baseRequestSender) send(req internal.Request) error {
return b.nextSender.send(req)
func (b *baseRequestSender) send(ctx context.Context, req Request) error {
return b.nextSender.send(ctx, req)
}

func (b *baseRequestSender) setNextSender(nextSender requestSender) {
Expand All @@ -44,40 +43,16 @@ type errorLoggingRequestSender struct {
message string
}

func (l *errorLoggingRequestSender) send(req internal.Request) error {
err := l.baseRequestSender.send(req)
func (l *errorLoggingRequestSender) send(ctx context.Context, req Request) error {
err := l.baseRequestSender.send(ctx, req)
if err != nil {
l.logger.Error(l.message, zap.Int("dropped_items", req.Count()), zap.Error(err))
l.logger.Error(l.message, zap.Int("dropped_items", req.ItemsCount()), zap.Error(err))
}
return err
}

type obsrepSenderFactory func(obsrep *ObsReport) requestSender

// baseRequest is a base implementation for the internal.Request.
type baseRequest struct {
ctx context.Context
processingFinishedCallback func()
}

func (req *baseRequest) Context() context.Context {
return req.ctx
}

func (req *baseRequest) SetContext(ctx context.Context) {
req.ctx = ctx
}

func (req *baseRequest) SetOnProcessingFinished(callback func()) {
req.processingFinishedCallback = callback
}

func (req *baseRequest) OnProcessingFinished() {
if req.processingFinishedCallback != nil {
req.processingFinishedCallback()
}
}

// Option apply changes to baseExporter.
type Option func(*baseExporter)

Expand Down Expand Up @@ -156,8 +131,8 @@ type baseExporter struct {
component.ShutdownFunc

requestExporter bool
marshaler internal.RequestMarshaler
unmarshaler internal.RequestUnmarshaler
marshaler RequestMarshaler
unmarshaler RequestUnmarshaler
signal component.DataType

set exporter.CreateSettings
Expand All @@ -178,8 +153,8 @@ type baseExporter struct {
}

// TODO: requestExporter, marshaler, and unmarshaler arguments can be removed when the old exporter helpers will be updated to call the new ones.
func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool, marshaler internal.RequestMarshaler,
unmarshaler internal.RequestUnmarshaler, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) {
func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool, marshaler RequestMarshaler,
unmarshaler RequestUnmarshaler, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) {

obsReport, err := NewObsReport(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set})
if err != nil {
Expand Down Expand Up @@ -210,8 +185,8 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req
}

// send sends the request using the first sender in the chain.
func (be *baseExporter) send(req internal.Request) error {
return be.queueSender.send(req)
func (be *baseExporter) send(ctx context.Context, req Request) error {
return be.queueSender.send(ctx, req)
}

// connectSenders connects the senders in the predefined order.
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestBaseExporterLogging(t *testing.T) {
bs, err := newBaseExporter(set, "", true, nil, nil, newNoopObsrepSender, WithRetry(rCfg))
require.Nil(t, err)
require.True(t, bs.requestExporter)
sendErr := bs.send(newErrorRequest(context.Background()))
sendErr := bs.send(context.Background(), newErrorRequest())
require.Error(t, sendErr)

require.Len(t, observed.FilterLevelExact(zap.ErrorLevel).All(), 1)
Expand Down
8 changes: 4 additions & 4 deletions exporter/exporterhelper/internal/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ import (
type boundedMemoryQueue struct {
stopWG sync.WaitGroup
stopped *atomic.Bool
items chan Request
items chan QueueRequest
numConsumers int
}

// NewBoundedMemoryQueue constructs the new queue of specified capacity. Capacity cannot be 0.
func NewBoundedMemoryQueue(capacity int, numConsumers int) Queue {
return &boundedMemoryQueue{
items: make(chan Request, capacity),
items: make(chan QueueRequest, capacity),
stopped: &atomic.Bool{},
numConsumers: numConsumers,
}
Expand All @@ -52,13 +52,13 @@ func (q *boundedMemoryQueue) Start(_ context.Context, _ component.Host, set Queu
}

// Produce is used by the producer to submit new item to the queue. Returns false in case of queue overflow.
func (q *boundedMemoryQueue) Produce(item Request) bool {
func (q *boundedMemoryQueue) Produce(ctx context.Context, req any) bool {
if q.stopped.Load() {
return false
}

select {
case q.items <- item:
case q.items <- newQueueRequest(ctx, req):
return true
default:
return false
Expand Down
55 changes: 27 additions & 28 deletions exporter/exporterhelper/internal/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,18 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
)

func newNopQueueSettings(callback func(item Request)) QueueSettings {
func newNopQueueSettings(callback func(any)) QueueSettings {
return QueueSettings{
DataType: component.DataTypeMetrics,
Callback: callback,
Callback: func(item QueueRequest) { callback(item.Request) },
}
}

type stringRequest struct {
Request
str string
}

func newStringRequest(str string) Request {
func newStringRequest(str string) stringRequest {
return stringRequest{str: str}
}

Expand All @@ -48,7 +47,7 @@ func TestBoundedQueue(t *testing.T) {
startLock.Lock() // block consumers
consumerState := newConsumerState(t)

assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item Request) {
assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item any) {
consumerState.record(item.(stringRequest).str)

// block further processing until startLock is released
Expand All @@ -57,7 +56,7 @@ func TestBoundedQueue(t *testing.T) {
startLock.Unlock()
})))

assert.True(t, q.Produce(newStringRequest("a")))
assert.True(t, q.Produce(context.Background(), newStringRequest("a")))

// at this point "a" may or may not have been received by the consumer go-routine
// so let's make sure it has been
Expand All @@ -70,10 +69,10 @@ func TestBoundedQueue(t *testing.T) {
})

// produce two more items. The first one should be accepted, but not consumed.
assert.True(t, q.Produce(newStringRequest("b")))
assert.True(t, q.Produce(context.Background(), newStringRequest("b")))
assert.Equal(t, 1, q.Size())
// the second should be rejected since the queue is full
assert.False(t, q.Produce(newStringRequest("c")))
assert.False(t, q.Produce(context.Background(), newStringRequest("c")))
assert.Equal(t, 1, q.Size())

startLock.Unlock() // unblock consumer
Expand All @@ -89,13 +88,13 @@ func TestBoundedQueue(t *testing.T) {
"b": true,
}
for _, item := range []string{"d", "e", "f"} {
assert.True(t, q.Produce(newStringRequest(item)))
assert.True(t, q.Produce(context.Background(), newStringRequest(item)))
expected[item] = true
consumerState.assertConsumed(expected)
}

assert.NoError(t, q.Shutdown(context.Background()))
assert.False(t, q.Produce(newStringRequest("x")), "cannot push to closed queue")
assert.False(t, q.Produce(context.Background(), newStringRequest("x")), "cannot push to closed queue")
}

// In this test we run a queue with many items and a slow consumer.
Expand All @@ -109,25 +108,25 @@ func TestShutdownWhileNotEmpty(t *testing.T) {

consumerState := newConsumerState(t)

assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item Request) {
assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item any) {
consumerState.record(item.(stringRequest).str)
time.Sleep(1 * time.Second)
})))

q.Produce(newStringRequest("a"))
q.Produce(newStringRequest("b"))
q.Produce(newStringRequest("c"))
q.Produce(newStringRequest("d"))
q.Produce(newStringRequest("e"))
q.Produce(newStringRequest("f"))
q.Produce(newStringRequest("g"))
q.Produce(newStringRequest("h"))
q.Produce(newStringRequest("i"))
q.Produce(newStringRequest("j"))
q.Produce(context.Background(), newStringRequest("a"))
q.Produce(context.Background(), newStringRequest("b"))
q.Produce(context.Background(), newStringRequest("c"))
q.Produce(context.Background(), newStringRequest("d"))
q.Produce(context.Background(), newStringRequest("e"))
q.Produce(context.Background(), newStringRequest("f"))
q.Produce(context.Background(), newStringRequest("g"))
q.Produce(context.Background(), newStringRequest("h"))
q.Produce(context.Background(), newStringRequest("i"))
q.Produce(context.Background(), newStringRequest("j"))

assert.NoError(t, q.Shutdown(context.Background()))

assert.False(t, q.Produce(newStringRequest("x")), "cannot push to closed queue")
assert.False(t, q.Produce(context.Background(), newStringRequest("x")), "cannot push to closed queue")
consumerState.assertConsumed(map[string]bool{
"a": true,
"b": true,
Expand Down Expand Up @@ -189,12 +188,12 @@ func queueUsage(b *testing.B, capacity int, numConsumers int, numberOfItems int)
b.ReportAllocs()
for i := 0; i < b.N; i++ {
q := NewBoundedMemoryQueue(capacity, numConsumers)
err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item Request) {
err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item any) {
time.Sleep(1 * time.Millisecond)
}))
require.NoError(b, err)
for j := 0; j < numberOfItems; j++ {
q.Produce(newStringRequest(fmt.Sprintf("%d", j)))
q.Produce(context.Background(), newStringRequest(fmt.Sprintf("%d", j)))
}
assert.NoError(b, q.Shutdown(context.Background()))
}
Expand Down Expand Up @@ -248,21 +247,21 @@ func (s *consumerState) assertConsumed(expected map[string]bool) {
func TestZeroSizeWithConsumers(t *testing.T) {
q := NewBoundedMemoryQueue(0, 1)

err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item Request) {}))
err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item any) {}))
assert.NoError(t, err)

assert.True(t, q.Produce(newStringRequest("a"))) // in process
assert.True(t, q.Produce(context.Background(), newStringRequest("a"))) // in process

assert.NoError(t, q.Shutdown(context.Background()))
}

func TestZeroSizeNoConsumers(t *testing.T) {
q := NewBoundedMemoryQueue(0, 0)

err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item Request) {}))
err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item any) {}))
assert.NoError(t, err)

assert.False(t, q.Produce(newStringRequest("a"))) // in process
assert.False(t, q.Produce(context.Background(), newStringRequest("a"))) // in process

assert.NoError(t, q.Shutdown(context.Background()))
}
11 changes: 6 additions & 5 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ type persistentQueue struct {
storage *persistentContiguousStorage
capacity uint64
numConsumers int
marshaler RequestMarshaler
unmarshaler RequestUnmarshaler
marshaler QueueRequestMarshaler
unmarshaler QueueRequestUnmarshaler
}

// buildPersistentStorageName returns a name that is constructed out of queue name and signal type. This is done
Expand All @@ -46,8 +46,8 @@ func buildPersistentStorageName(name string, signal component.DataType) string {
}

// NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage
func NewPersistentQueue(capacity int, numConsumers int, storageID component.ID, marshaler RequestMarshaler,
unmarshaler RequestUnmarshaler, set exporter.CreateSettings) Queue {
func NewPersistentQueue(capacity int, numConsumers int, storageID component.ID, marshaler QueueRequestMarshaler,
unmarshaler QueueRequestUnmarshaler, set exporter.CreateSettings) Queue {
return &persistentQueue{
capacity: uint64(capacity),
numConsumers: numConsumers,
Expand Down Expand Up @@ -85,7 +85,8 @@ func (pq *persistentQueue) Start(ctx context.Context, host component.Host, set Q
}

// Produce adds an item to the queue and returns true if it was accepted
func (pq *persistentQueue) Produce(item Request) bool {
// Request context is currently ignored by the persistent queue.
func (pq *persistentQueue) Produce(_ context.Context, item any) bool {
err := pq.storage.put(item)
return err == nil
}
Expand Down
Loading

0 comments on commit a6c0bd4

Please sign in to comment.