diff --git a/pkg/inspector/inspector.go b/pkg/inspector/inspector.go index 2cdfa78bb..0b8a3bace 100644 --- a/pkg/inspector/inspector.go +++ b/pkg/inspector/inspector.go @@ -17,6 +17,7 @@ package inspector import ( "context" "sync" + "sync/atomic" "github.com/conduitio/conduit/pkg/foundation/log" "github.com/conduitio/conduit/pkg/foundation/metrics/measure" @@ -26,43 +27,14 @@ import ( const DefaultBufferSize = 1000 -// Session wraps a channel of records and provides: -// 1. a way to send records to it asynchronously -// 2. a way to know if it's closed or not +// Session represents a single inspector session. Records are continuously sent +// into channel C. If the buffer of C is full, records will be dropped. C will +// be closed once the session is removed from the inspector. type Session struct { C chan record.Record - id string - logger log.CtxLogger - onClose func() - once *sync.Once -} - -func (s *Session) close() { - // close() can be called multiple times on a session. One example is: - // There's an active inspector session on a component (processor or connector), - // during which the component is deleted. - // The session channel will be closed, which terminate the API request fetching - // record from this session. - // However, the API request termination also closes the session. - s.once.Do(func() { - s.onClose() - close(s.C) - }) -} - -// send a record to the session's channel. -// If the channel has already reached its capacity, -// the record will be ignored. -func (s *Session) send(ctx context.Context, r record.Record) { - select { - case s.C <- r: - default: - s.logger. - Warn(ctx). - Str(log.InspectorSessionID, s.id). - Msg("session buffer full, record will be dropped") - } + id string + componentID string } // Inspector is attached to an inspectable pipeline component @@ -74,7 +46,12 @@ type Inspector struct { // keys are sessions IDs. sessions map[string]*Session // guards access to sessions - lock sync.Mutex + lock sync.Mutex + // hasSessions is set to true when there are open sessions. This allows us + // to take a shortcut without acquiring the lock in the happy path, when + // there are no sessions. + hasSessions atomic.Bool + logger log.CtxLogger bufferSize int } @@ -87,73 +64,73 @@ func New(logger log.CtxLogger, bufferSize int) *Inspector { } } +// NewSession creates a new session in given inspector. +// componentID is the ID of the component being inspected (connector or processor). +// The session will be closed and removed from the inspector when the context is +// closed. +func (i *Inspector) NewSession(ctx context.Context, componentID string) *Session { + s := &Session{ + C: make(chan record.Record, i.bufferSize), + id: uuid.NewString(), + componentID: componentID, + } + + i.add(s) + go func() { + <-ctx.Done() + i.remove(s.id) + }() + + return s +} + // Send the given record to all registered sessions. // The method does not wait for consumers to get the records. func (i *Inspector) Send(ctx context.Context, r record.Record) { - // copy metadata, to prevent issues when concurrently accessing the metadata - var meta record.Metadata - if len(r.Metadata) != 0 { - meta = make(record.Metadata, len(r.Metadata)) - for k, v := range r.Metadata { - meta[k] = v - } + // shortcut - we don't expect any sessions, so we check the atomic variable + // before acquiring an actual lock + if !i.hasSessions.Load() { + return } - // todo optimize this, as we have locks for every record. + // clone record only once, the listeners aren't expected to manipulate the records + rClone := r.Clone() + // locks are needed to make sure the `sessions` slice // is not modified as we're iterating over it i.lock.Lock() defer i.lock.Unlock() for _, s := range i.sessions { - s.send(ctx, record.Record{ - Position: r.Position, - Operation: r.Operation, - Metadata: meta, - Key: r.Key, - Payload: r.Payload, - }) + select { + case s.C <- rClone: + default: + i.logger. + Warn(ctx). + Str(log.InspectorSessionID, s.id). + Msg("session buffer full, record will be dropped") + } } } -// NewSession creates a new session in given inspector. -// componentID is the ID of the component being inspected (connector or processor). -func (i *Inspector) NewSession(ctx context.Context, componentID string) *Session { - id := uuid.NewString() - s := &Session{ - C: make(chan record.Record, i.bufferSize), - id: id, - logger: i.logger.WithComponent("inspector.Session"), - onClose: func() { - i.remove(id) - measure.InspectorsGauge.WithValues(componentID).Dec() - }, - once: &sync.Once{}, +func (i *Inspector) Close() { + for k := range i.sessions { + i.remove(k) } - measure.InspectorsGauge.WithValues(componentID).Inc() - - go func() { - <-ctx.Done() - s.logger. - Debug(context.Background()). - Msgf("context done: %v", ctx.Err()) - s.close() - }() +} +// add a session with given ID to this Inspector. +func (i *Inspector) add(s *Session) { i.lock.Lock() defer i.lock.Unlock() - i.sessions[id] = s + i.sessions[s.id] = s + i.hasSessions.Store(true) + measure.InspectorsGauge.WithValues(s.componentID).Inc() + i.logger. Info(context.Background()). - Str(log.InspectorSessionID, id). + Str(log.InspectorSessionID, s.id). Msg("session created") - return s -} - -func (i *Inspector) Close() { - for _, s := range i.sessions { - s.close() - } } // remove a session with given ID from this Inspector. @@ -161,7 +138,18 @@ func (i *Inspector) remove(id string) { i.lock.Lock() defer i.lock.Unlock() + s, ok := i.sessions[id] + if !ok { + return // session already removed + } + + close(s.C) delete(i.sessions, id) + if len(i.sessions) == 0 { + i.hasSessions.Store(false) + } + measure.InspectorsGauge.WithValues(s.componentID).Dec() + i.logger. Info(context.Background()). Str(log.InspectorSessionID, id). diff --git a/pkg/inspector/inspector_benchmark_test.go b/pkg/inspector/inspector_benchmark_test.go index 8636570eb..5ca369ed7 100644 --- a/pkg/inspector/inspector_benchmark_test.go +++ b/pkg/inspector/inspector_benchmark_test.go @@ -22,10 +22,32 @@ import ( "github.com/conduitio/conduit/pkg/record" ) +func BenchmarkInspector_NoSession_Send(b *testing.B) { + ins := New(log.Nop(), 10) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + ins.Send(context.Background(), record.Record{Position: record.Position("test-pos")}) + } +} + func BenchmarkInspector_SingleSession_Send(b *testing.B) { ins := New(log.Nop(), 10) ins.NewSession(context.Background(), "test-id") + b.ResetTimer() + for i := 0; i < b.N; i++ { + ins.Send(context.Background(), record.Record{Position: record.Position("test-pos")}) + } +} + +func BenchmarkInspector_10Sessions_Send(b *testing.B) { + ins := New(log.Nop(), 10) + for i := 0; i < 10; i++ { + ins.NewSession(context.Background(), "test-id") + } + + b.ResetTimer() for i := 0; i < b.N; i++ { ins.Send(context.Background(), record.Record{Position: record.Position("test-pos")}) } diff --git a/pkg/inspector/inspector_test.go b/pkg/inspector/inspector_test.go index f28616ca4..577464dfd 100644 --- a/pkg/inspector/inspector_test.go +++ b/pkg/inspector/inspector_test.go @@ -70,7 +70,7 @@ func TestInspector_Send_SessionClosed(t *testing.T) { underTest.Send(context.Background(), r) assertGotRecord(is, s, r) - s.close() + underTest.remove(s.id) underTest.Send( context.Background(), record.Record{ @@ -123,7 +123,7 @@ func TestInspector_Send_SlowConsumer(t *testing.T) { s := underTest.NewSession(context.Background(), "test-id") for i := 0; i < bufferSize+1; i++ { - s.send( + underTest.Send( context.Background(), record.Record{ Position: record.Position(fmt.Sprintf("test-pos-%v", i)),