Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Perf] Inspector refactoring and optimization #1248

Merged
merged 4 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 69 additions & 81 deletions pkg/inspector/inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package inspector
import (
"context"
"sync"
"sync/atomic"

"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/foundation/metrics/measure"
Expand All @@ -26,43 +27,14 @@ import (

const DefaultBufferSize = 1000

// Session wraps a channel of records and provides:
// 1. a way to send records to it asynchronously
// 2. a way to know if it's closed or not
// Session represents a single inspector session. Records are continuously sent
// into channel C. If the buffer of C is full, records will be dropped. C will
// be closed once the session is removed from the inspector.
type Session struct {
C chan record.Record

id string
logger log.CtxLogger
onClose func()
once *sync.Once
}

func (s *Session) close() {
// close() can be called multiple times on a session. One example is:
// There's an active inspector session on a component (processor or connector),
// during which the component is deleted.
// The session channel will be closed, which terminate the API request fetching
// record from this session.
// However, the API request termination also closes the session.
s.once.Do(func() {
s.onClose()
close(s.C)
})
}

// send a record to the session's channel.
// If the channel has already reached its capacity,
// the record will be ignored.
func (s *Session) send(ctx context.Context, r record.Record) {
select {
case s.C <- r:
default:
s.logger.
Warn(ctx).
Str(log.InspectorSessionID, s.id).
Msg("session buffer full, record will be dropped")
}
id string
componentID string
}

// Inspector is attached to an inspectable pipeline component
Expand All @@ -74,7 +46,12 @@ type Inspector struct {
// keys are sessions IDs.
sessions map[string]*Session
// guards access to sessions
lock sync.Mutex
lock sync.Mutex
// hasSessions is set to true when there are open sessions. This allows us
// to take a shortcut without acquiring the lock in the happy path, when
// there are no sessions.
hasSessions atomic.Bool

logger log.CtxLogger
bufferSize int
}
Expand All @@ -87,81 +64,92 @@ func New(logger log.CtxLogger, bufferSize int) *Inspector {
}
}

// NewSession creates a new session in given inspector.
// componentID is the ID of the component being inspected (connector or processor).
// The session will be closed and removed from the inspector when the context is
// closed.
func (i *Inspector) NewSession(ctx context.Context, componentID string) *Session {
s := &Session{
C: make(chan record.Record, i.bufferSize),
id: uuid.NewString(),
componentID: componentID,
}

i.add(s)
go func() {
<-ctx.Done()
i.remove(s.id)
}()

return s
}

// Send the given record to all registered sessions.
// The method does not wait for consumers to get the records.
func (i *Inspector) Send(ctx context.Context, r record.Record) {
// copy metadata, to prevent issues when concurrently accessing the metadata
var meta record.Metadata
if len(r.Metadata) != 0 {
meta = make(record.Metadata, len(r.Metadata))
for k, v := range r.Metadata {
meta[k] = v
}
// shortcut - we don't expect any sessions, so we check the atomic variable
// before acquiring an actual lock
if !i.hasSessions.Load() {
return
}

// todo optimize this, as we have locks for every record.
// clone record only once, the listeners aren't expected to manipulate the records
rClone := r.Clone()

// locks are needed to make sure the `sessions` slice
// is not modified as we're iterating over it
i.lock.Lock()
defer i.lock.Unlock()
for _, s := range i.sessions {
s.send(ctx, record.Record{
Position: r.Position,
Operation: r.Operation,
Metadata: meta,
Key: r.Key,
Payload: r.Payload,
})
select {
case s.C <- rClone:
default:
i.logger.
Warn(ctx).
Str(log.InspectorSessionID, s.id).
Msg("session buffer full, record will be dropped")
}
}
}

// NewSession creates a new session in given inspector.
// componentID is the ID of the component being inspected (connector or processor).
func (i *Inspector) NewSession(ctx context.Context, componentID string) *Session {
id := uuid.NewString()
s := &Session{
C: make(chan record.Record, i.bufferSize),
id: id,
logger: i.logger.WithComponent("inspector.Session"),
onClose: func() {
i.remove(id)
measure.InspectorsGauge.WithValues(componentID).Dec()
},
once: &sync.Once{},
func (i *Inspector) Close() {
for k := range i.sessions {
i.remove(k)
}
measure.InspectorsGauge.WithValues(componentID).Inc()

go func() {
<-ctx.Done()
s.logger.
Debug(context.Background()).
Msgf("context done: %v", ctx.Err())
s.close()
}()
}

// add a session with given ID to this Inspector.
func (i *Inspector) add(s *Session) {
lovromazgon marked this conversation as resolved.
Show resolved Hide resolved
i.lock.Lock()
defer i.lock.Unlock()

i.sessions[id] = s
i.sessions[s.id] = s
i.hasSessions.Store(true)
measure.InspectorsGauge.WithValues(s.componentID).Inc()

i.logger.
Info(context.Background()).
Str(log.InspectorSessionID, id).
Str(log.InspectorSessionID, s.id).
Msg("session created")
return s
}

func (i *Inspector) Close() {
for _, s := range i.sessions {
s.close()
}
}

// remove a session with given ID from this Inspector.
func (i *Inspector) remove(id string) {
i.lock.Lock()
defer i.lock.Unlock()

s, ok := i.sessions[id]
if !ok {
return // session already removed
}

close(s.C)
delete(i.sessions, id)
if len(i.sessions) == 0 {
i.hasSessions.Store(false)
}
measure.InspectorsGauge.WithValues(s.componentID).Dec()

i.logger.
Info(context.Background()).
Str(log.InspectorSessionID, id).
Expand Down
22 changes: 22 additions & 0 deletions pkg/inspector/inspector_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,32 @@ import (
"github.com/conduitio/conduit/pkg/record"
)

func BenchmarkInspector_NoSession_Send(b *testing.B) {
ins := New(log.Nop(), 10)

b.ResetTimer()
for i := 0; i < b.N; i++ {
ins.Send(context.Background(), record.Record{Position: record.Position("test-pos")})
}
}

func BenchmarkInspector_SingleSession_Send(b *testing.B) {
ins := New(log.Nop(), 10)
ins.NewSession(context.Background(), "test-id")

b.ResetTimer()
for i := 0; i < b.N; i++ {
ins.Send(context.Background(), record.Record{Position: record.Position("test-pos")})
}
}

func BenchmarkInspector_10Sessions_Send(b *testing.B) {
ins := New(log.Nop(), 10)
for i := 0; i < 10; i++ {
ins.NewSession(context.Background(), "test-id")
}

lovromazgon marked this conversation as resolved.
Show resolved Hide resolved
b.ResetTimer()
for i := 0; i < b.N; i++ {
ins.Send(context.Background(), record.Record{Position: record.Position("test-pos")})
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/inspector/inspector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestInspector_Send_SessionClosed(t *testing.T) {
underTest.Send(context.Background(), r)
assertGotRecord(is, s, r)

s.close()
underTest.remove(s.id)
underTest.Send(
context.Background(),
record.Record{
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestInspector_Send_SlowConsumer(t *testing.T) {
s := underTest.NewSession(context.Background(), "test-id")

for i := 0; i < bufferSize+1; i++ {
s.send(
underTest.Send(
context.Background(),
record.Record{
Position: record.Position(fmt.Sprintf("test-pos-%v", i)),
Expand Down