Skip to content

Commit

Permalink
Stream Inspector: Close inspector when connector/processor is deleted (
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso authored Feb 24, 2023
1 parent a3c963a commit 57de7b0
Show file tree
Hide file tree
Showing 12 changed files with 117 additions and 3 deletions.
4 changes: 4 additions & 0 deletions pkg/connector/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,7 @@ func (i *Instance) Connector(ctx context.Context, dispenserFetcher PluginDispens
return nil, ErrInvalidConnectorType
}
}

func (i *Instance) Close() {
i.inspector.Close()
}
1 change: 1 addition & 0 deletions pkg/connector/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func (s *Service) Delete(ctx context.Context, id string) error {
return cerrors.Errorf("could not delete connector instance %v from store: %w", id, err)
}
delete(s.connectors, id)
instance.Close()
measure.ConnectorsGauge.WithValues(strings.ToLower(instance.Type.String())).Dec()

return nil
Expand Down
20 changes: 18 additions & 2 deletions pkg/inspector/inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,20 @@ type Session struct {
id string
logger log.CtxLogger
onClose func()
once *sync.Once
}

func (s *Session) close() {
s.onClose()
close(s.C)
// close() can be called multiple times on a session. One example is:
// There's an active inspector session on a component (processor or connector),
// during which the component is deleted.
// The session channel will be closed, which terminate the API request fetching
// record from this session.
// However, the API request termination also closes the session.
s.once.Do(func() {
s.onClose()
close(s.C)
})
}

// send a record to the session's channel.
Expand Down Expand Up @@ -114,6 +123,7 @@ func (i *Inspector) NewSession(ctx context.Context) *Session {
onClose: func() {
i.remove(id)
},
once: &sync.Once{},
}
go func() {
<-ctx.Done()
Expand All @@ -134,6 +144,12 @@ func (i *Inspector) NewSession(ctx context.Context) *Session {
return s
}

func (i *Inspector) Close() {
for _, s := range i.sessions {
s.close()
}
}

// remove a session with given ID from this Inspector.
func (i *Inspector) remove(id string) {
i.lock.Lock()
Expand Down
11 changes: 11 additions & 0 deletions pkg/inspector/inspector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,17 @@ func TestInspector_Send_SessionClosed(t *testing.T) {
)
}

func TestInspector_Close(t *testing.T) {
is := is.New(t)

underTest := New(log.Nop(), 10)
s := underTest.NewSession(context.Background())

underTest.Close()
_, ok := <-s.C
is.True(!ok)
}

func TestInspector_Send_SessionCtxCanceled(t *testing.T) {
is := is.New(t)

Expand Down
6 changes: 6 additions & 0 deletions pkg/processor/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,15 @@ type Interface interface {
// Process runs the processor function on a record.
Process(ctx context.Context, record record.Record) (record.Record, error)

// InspectIn starts an inspection session for input records for this processor.
InspectIn(ctx context.Context) *inspector.Session

// InspectOut starts an inspection session for output records for this processor.
InspectOut(ctx context.Context) *inspector.Session

// Close closes this processor and releases any resources
// which may have been used by it.
Close()
}

// Instance represents a processor instance.
Expand Down
12 changes: 12 additions & 0 deletions pkg/processor/mock/processor.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pkg/processor/procbuiltin/func_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,8 @@ func (f FuncWrapper) InspectIn(ctx context.Context) *inspector.Session {
func (f FuncWrapper) InspectOut(ctx context.Context) *inspector.Session {
return f.outInsp.NewSession(ctx)
}

func (f FuncWrapper) Close() {
f.inInsp.Close()
f.outInsp.Close()
}
24 changes: 24 additions & 0 deletions pkg/processor/procbuiltin/func_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,27 @@ func TestFuncWrapper_InspectOut_ProcessingFailed(t *testing.T) {
_, _, err := cchan.ChanOut[record.Record](session.C).RecvTimeout(ctx, 100*time.Millisecond)
is.True(cerrors.Is(err, context.DeadlineExceeded))
}

func TestFuncWrapper_Close(t *testing.T) {
ctx := context.Background()

is := is.New(t)

underTest := NewFuncWrapper(func(_ context.Context, in record.Record) (record.Record, error) {
return record.Record{}, nil
})

in := underTest.InspectIn(ctx)
out := underTest.InspectOut(ctx)
underTest.Close()

// incoming records session should be closed
_, got, err := cchan.ChanOut[record.Record](in.C).RecvTimeout(ctx, 100*time.Millisecond)
is.NoErr(err)
is.True(!got)

// outgoing records session should be closed
_, got, err = cchan.ChanOut[record.Record](out.C).RecvTimeout(ctx, 100*time.Millisecond)
is.NoErr(err)
is.True(!got)
}
5 changes: 5 additions & 0 deletions pkg/processor/procjs/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ func (p *Processor) InspectOut(ctx context.Context) *inspector.Session {
return p.outInsp.NewSession(ctx)
}

func (p *Processor) Close() {
p.inInsp.Close()
p.outInsp.Close()
}

func (p *Processor) toJSRecord(r record.Record) goja.Value {
convertData := func(d record.Data) interface{} {
switch v := d.(type) {
Expand Down
28 changes: 28 additions & 0 deletions pkg/processor/procjs/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,34 @@ func TestJSProcessor_Inspect(t *testing.T) {
is.True(got)
is.Equal(recOut, inspOut)
}

func TestJSProcessor_Close(t *testing.T) {
is := is.New(t)
ctx := context.Background()
src := `
function process(record) {
record.Key = new RawData();
record.Key.Raw = "foobar";
return record;
}`
underTest, err := New(src, zerolog.Nop())
is.NoErr(err) // expected no error when creating the JS processor

in := underTest.InspectIn(ctx)
out := underTest.InspectOut(ctx)
underTest.Close()

// incoming records session should be closed
_, got, err := cchan.ChanOut[record.Record](in.C).RecvTimeout(ctx, 100*time.Millisecond)
is.NoErr(err)
is.True(!got)

// outgoing records session should be closed
_, got, err = cchan.ChanOut[record.Record](out.C).RecvTimeout(ctx, 100*time.Millisecond)
is.NoErr(err)
is.True(!got)
}

func TestJSProcessor_JavaScriptException(t *testing.T) {
is := is.New(t)

Expand Down
3 changes: 2 additions & 1 deletion pkg/processor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (s *Service) List(_ context.Context) map[string]*Instance {
}

// Get will return a single processor or an error.
func (s *Service) Get(ctx context.Context, id string) (*Instance, error) {
func (s *Service) Get(_ context.Context, id string) (*Instance, error) {
ins, ok := s.instances[id]
if !ok {
return nil, cerrors.Errorf("%w (ID: %s)", ErrInstanceNotFound, id)
Expand Down Expand Up @@ -167,6 +167,7 @@ func (s *Service) Delete(ctx context.Context, id string) error {
return cerrors.Errorf("could not delete processor instance from store: %w", err)
}
delete(s.instances, id)
instance.Processor.Close()
measure.ProcessorsGauge.WithValues(instance.Type).Dec()

return nil
Expand Down
1 change: 1 addition & 0 deletions pkg/processor/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func TestService_Delete_Success(t *testing.T) {

procType := "processor-type"
p := mock.NewProcessor(ctrl)
p.EXPECT().Close()

registry := newTestBuilderRegistry(t, map[string]processor.Interface{procType: p})
service := processor.NewService(log.Nop(), db, registry)
Expand Down

0 comments on commit 57de7b0

Please sign in to comment.