Skip to content

Commit

Permalink
Make batcher generic which will allow to make exporterhelper generic
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Jan 14, 2025
1 parent 71aae79 commit 9371abc
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 48 deletions.
48 changes: 25 additions & 23 deletions exporter/internal/queue/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
"go.opentelemetry.io/collector/exporter/internal"
)

type batch struct {
type batch[K any] struct {
ctx context.Context
req internal.Request
req K
idxList []uint64
}

Expand All @@ -24,13 +24,14 @@ type Batcher interface {
component.Component
}

type BaseBatcher struct {
type BaseBatcher[K any] struct {
batchCfg exporterbatcher.Config
queue exporterqueue.Queue[internal.Request]
queue exporterqueue.Queue[K]
// TODO: Remove when the -1 hack for testing is removed.
merger Merger[K]
maxWorkers int
workerPool chan bool
exportFunc func(ctx context.Context, req internal.Request) error
exportFunc func(ctx context.Context, req K) error
stopWG sync.WaitGroup
}

Expand All @@ -40,26 +41,27 @@ func NewBatcher(batchCfg exporterbatcher.Config,
maxWorkers int,
) (Batcher, error) {
if !batchCfg.Enabled {
return &DisabledBatcher{BaseBatcher: newBaseBatcher(batchCfg, queue, exportFunc, maxWorkers)}, nil
return &DisabledBatcher[internal.Request]{BaseBatcher: newBaseBatcher[internal.Request](batchCfg, queue, exportFunc, maxWorkers)}, nil

Check failure on line 44 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / Integration test

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 44 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / test-coverage

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 44 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / unittest-matrix (ubuntu-latest, ~1.23)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 44 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / unittest-matrix (ubuntu-latest, ~1.22)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 44 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 44 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 44 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (pkg)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 44 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (pkg)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 44 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 44 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 44 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 44 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 44 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 44 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 44 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 44 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 44 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 44 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 44 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 44 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 44 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / windows-unittest (windows-2022)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 44 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / windows-unittest (windows-2025)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 44 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 44 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 44 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 44 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])
}
return &DefaultBatcher{BaseBatcher: newBaseBatcher(batchCfg, queue, exportFunc, maxWorkers)}, nil
return &DefaultBatcher[internal.Request]{BaseBatcher: newBaseBatcher[internal.Request](batchCfg, queue, exportFunc, maxWorkers)}, nil

Check failure on line 46 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / Integration test

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 46 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / test-coverage

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 46 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / unittest-matrix (ubuntu-latest, ~1.23)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 46 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / unittest-matrix (ubuntu-latest, ~1.22)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 46 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 46 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 46 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (pkg)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 46 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (pkg)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 46 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 46 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 46 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 46 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 46 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 46 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 46 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 46 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 46 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 46 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 46 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 46 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 46 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / windows-unittest (windows-2022)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 46 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / windows-unittest (windows-2025)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 46 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 46 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 46 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])

Check failure on line 46 in exporter/internal/queue/batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

invalid operation: cannot index newBaseBatcher (value of type func(batchCfg exporterbatcher.Config, queue exporterqueue.Queue["go.opentelemetry.io/collector/exporter/internal".Request], exportFunc func(ctx context.Context, req "go.opentelemetry.io/collector/exporter/internal".Request) error, maxWorkers int) BaseBatcher["go.opentelemetry.io/collector/exporter/internal".Request])
}

func newBaseBatcher(batchCfg exporterbatcher.Config,
queue exporterqueue.Queue[internal.Request],
exportFunc func(ctx context.Context, req internal.Request) error,
maxWorkers int,
) BaseBatcher {
) BaseBatcher[internal.Request] {
var workerPool chan bool
if maxWorkers > 0 {
workerPool = make(chan bool, maxWorkers)
for i := 0; i < maxWorkers; i++ {
workerPool <- true
}
}
return BaseBatcher{
return BaseBatcher[internal.Request]{
batchCfg: batchCfg,
queue: queue,
merger: requestMerger{},
maxWorkers: maxWorkers,
workerPool: workerPool,
exportFunc: exportFunc,
Expand All @@ -68,19 +70,19 @@ func newBaseBatcher(batchCfg exporterbatcher.Config,
}

// flush starts a goroutine that calls exportFunc. It blocks until a worker is available if necessary.
func (qb *BaseBatcher) flush(batchToFlush batch) {
qb.stopWG.Add(1)
if qb.workerPool != nil {
<-qb.workerPool
func (qb *BaseBatcher[K]) flush(batchToFlush batch[K]) {
err := qb.exportFunc(batchToFlush.ctx, batchToFlush.req)
for _, idx := range batchToFlush.idxList {
qb.queue.OnProcessingFinished(idx, err)
}
go func() {
defer qb.stopWG.Done()
err := qb.exportFunc(batchToFlush.ctx, batchToFlush.req)
for _, idx := range batchToFlush.idxList {
qb.queue.OnProcessingFinished(idx, err)
}
if qb.workerPool != nil {
qb.workerPool <- true
}
}()
}

type Merger[K any] interface {
MergeSplit(src, dst K, cfg exporterbatcher.MaxSizeConfig) ([]K, error)
}

type requestMerger struct{}

func (requestMerger) MergeSplit(src, dst internal.Request, cfg exporterbatcher.MaxSizeConfig) ([]internal.Request, error) {
return src.MergeSplit(context.Background(), cfg, dst)
}
40 changes: 20 additions & 20 deletions exporter/internal/queue/default_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,22 @@ import (
)

// DefaultBatcher continuously reads from the queue and flushes asynchronously if size limit is met or on timeout.
type DefaultBatcher struct {
BaseBatcher
type DefaultBatcher[K internal.Request] struct {
BaseBatcher[K]
currentBatchMu sync.Mutex
currentBatch *batch
currentBatch *batch[K]
timer *time.Timer
shutdownCh chan bool
}

func (qb *DefaultBatcher) resetTimer() {
func (qb *DefaultBatcher[K]) resetTimer() {
if qb.batchCfg.FlushTimeout != 0 {
qb.timer.Reset(qb.batchCfg.FlushTimeout)
}
}

// startReadingFlushingGoroutine starts a goroutine that reads and then flushes.
func (qb *DefaultBatcher) startReadingFlushingGoroutine() {
func (qb *DefaultBatcher[K]) startReadingFlushingGoroutine() {
qb.stopWG.Add(1)
go func() {
defer qb.stopWG.Done()
Expand All @@ -44,13 +44,13 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() {
qb.currentBatchMu.Lock()

if qb.batchCfg.MaxSizeItems > 0 {
var reqList []internal.Request
var reqList []K
var mergeSplitErr error
if qb.currentBatch == nil || qb.currentBatch.req == nil {
if qb.currentBatch == nil {
qb.resetTimer()
reqList, mergeSplitErr = req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, nil)
reqList, mergeSplitErr = qb.merger.MergeSplit(req, nil, qb.batchCfg.MaxSizeConfig)

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / Integration test

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / test-coverage

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (pkg)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (pkg)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / windows-unittest (windows-2022)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / windows-unittest (windows-2025)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

cannot use nil as K value in argument to qb.merger.MergeSplit
} else {
reqList, mergeSplitErr = qb.currentBatch.req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, req)
reqList, mergeSplitErr = qb.merger.MergeSplit(qb.currentBatch.req, req, qb.batchCfg.MaxSizeConfig)
}

if mergeSplitErr != nil || reqList == nil {
Expand All @@ -64,7 +64,7 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() {
qb.currentBatch = nil
qb.currentBatchMu.Unlock()
for i := 0; i < len(reqList); i++ {
qb.flush(batch{
qb.flush(batch[K]{
req: reqList[i],
ctx: ctx,
idxList: []uint64{idx},
Expand All @@ -73,30 +73,30 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() {
}
qb.resetTimer()
} else {
qb.currentBatch = &batch{
qb.currentBatch = &batch[K]{
req: reqList[0],
ctx: ctx,
idxList: []uint64{idx},
}
qb.currentBatchMu.Unlock()
}
} else {
if qb.currentBatch == nil || qb.currentBatch.req == nil {
if qb.currentBatch == nil {
qb.resetTimer()
qb.currentBatch = &batch{
qb.currentBatch = &batch[K]{
req: req,
ctx: ctx,
idxList: []uint64{idx},
}
} else {
// TODO: consolidate implementation for the cases where MaxSizeConfig is specified and the case where it is not specified
mergedReq, mergeErr := qb.currentBatch.req.MergeSplit(qb.currentBatch.ctx, qb.batchCfg.MaxSizeConfig, req)
mergedReq, mergeErr := qb.merger.MergeSplit(qb.currentBatch.req, req, qb.batchCfg.MaxSizeConfig)
if mergeErr != nil {
qb.queue.OnProcessingFinished(idx, mergeErr)
qb.currentBatchMu.Unlock()
continue
}
qb.currentBatch = &batch{
qb.currentBatch = &batch[K]{
req: mergedReq[0],
ctx: qb.currentBatch.ctx,
idxList: append(qb.currentBatch.idxList, idx),
Expand All @@ -120,7 +120,7 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() {
}

// startTimeBasedFlushingGoroutine starts a goroutine that flushes on timeout.
func (qb *DefaultBatcher) startTimeBasedFlushingGoroutine() {
func (qb *DefaultBatcher[K]) startTimeBasedFlushingGoroutine() {
qb.stopWG.Add(1)
go func() {
defer qb.stopWG.Done()
Expand All @@ -136,7 +136,7 @@ func (qb *DefaultBatcher) startTimeBasedFlushingGoroutine() {
}

// Start starts the goroutine that reads from the queue and flushes asynchronously.
func (qb *DefaultBatcher) Start(_ context.Context, _ component.Host) error {
func (qb *DefaultBatcher[K]) Start(_ context.Context, _ component.Host) error {
// maxWorker being -1 means batcher is disabled. This is for testing queue sender metrics.
if qb.maxWorkers == -1 {
return nil
Expand All @@ -157,9 +157,9 @@ func (qb *DefaultBatcher) Start(_ context.Context, _ component.Host) error {
}

// flushCurrentBatchIfNecessary sends out the current request batch if it is not nil
func (qb *DefaultBatcher) flushCurrentBatchIfNecessary() {
func (qb *DefaultBatcher[K]) flushCurrentBatchIfNecessary() {
qb.currentBatchMu.Lock()
if qb.currentBatch == nil || qb.currentBatch.req == nil {
if qb.currentBatch == nil {
qb.currentBatchMu.Unlock()
return
}
Expand All @@ -173,7 +173,7 @@ func (qb *DefaultBatcher) flushCurrentBatchIfNecessary() {
}

// Shutdown ensures that queue and all Batcher are stopped.
func (qb *DefaultBatcher) Shutdown(_ context.Context) error {
func (qb *DefaultBatcher[K]) Shutdown(_ context.Context) error {
qb.flushCurrentBatchIfNecessary()
qb.stopWG.Wait()
return nil
Expand Down
10 changes: 5 additions & 5 deletions exporter/internal/queue/disabled_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (

// DisabledBatcher is a special-case of Batcher that has no size limit for sending. Any items read from the queue will
// be sent out (asynchronously) immediately regardless of the size.
type DisabledBatcher struct {
BaseBatcher
type DisabledBatcher[K any] struct {
BaseBatcher[K]
}

// Start starts the goroutine that reads from the queue and flushes asynchronously.
func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error {
func (qb *DisabledBatcher[K]) Start(_ context.Context, _ component.Host) error {
// maxWorker being -1 means batcher is disabled. This is for testing queue sender metrics.
if qb.maxWorkers == -1 {
return nil
Expand All @@ -33,7 +33,7 @@ func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error {
if !ok {
return
}
qb.flush(batch{
qb.flush(batch[K]{
req: req,
ctx: context.Background(),
idxList: []uint64{idx},
Expand All @@ -44,7 +44,7 @@ func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error {
}

// Shutdown ensures that queue and all Batcher are stopped.
func (qb *DisabledBatcher) Shutdown(_ context.Context) error {
func (qb *DisabledBatcher[K]) Shutdown(_ context.Context) error {
qb.stopWG.Wait()
return nil
}

0 comments on commit 9371abc

Please sign in to comment.