diff --git a/docs/metrics.md b/docs/metrics.md index a428219f1..b014edd7b 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -19,19 +19,20 @@ locally, you can get metrics if you run `curl localhost:8080/metrics`. in [`measure.go`](https://github.com/ConduitIO/conduit/blob/main/pkg/foundation/metrics/measure/measure.go). Those are: - | Pipeline name | Type | Description | - |------------------------------------------------|-----------|----------------------------------------------------------------------------------------------------------------| - | `conduit_pipelines` | Gauge | Number of pipelines by status. | - | `conduit_connectors` | Gauge | Number of connectors by type (source, destination). | - | `conduit_processors` | Gauge | Number of processors by name and type. | - | `conduit_connector_bytes` | Histogram | Number of bytes* a connector processed by pipeline name, plugin and type (source, destination). | - | `conduit_dlq_bytes` | Histogram | Number of bytes* a DLQ connector processed per pipeline and plugin. | - | `conduit_pipeline_execution_duration_seconds` | Histogram | Amount of time records spent in a pipeline. | - | `conduit_connector_execution_duration_seconds` | Histogram | Amount of time spent reading or writing records per pipeline, plugin and connector type (source, destination). | - | `conduit_processor_execution_duration_seconds` | Histogram | Amount of time spent on processing records per pipeline and processor. | - | `conduit_dlq_execution_duration_seconds` | Histogram | Amount of time spent writing records to DLQ connector per pipeline and plugin. | - - \*We calculate bytes based on the JSON representation of the record payload + | Pipeline name | Type | Description | + |------------------------------------------------|-----------|----------------------------------------------------------------------------------------------------------------| + | `conduit_pipelines` | Gauge | Number of pipelines by status. | + | `conduit_connectors` | Gauge | Number of connectors by type (source, destination). | + | `conduit_processors` | Gauge | Number of processors by name and type. | + | `conduit_connector_bytes` | Histogram | Number of bytes* a connector processed by pipeline name, plugin and type (source, destination). | + | `conduit_dlq_bytes` | Histogram | Number of bytes* a DLQ connector processed per pipeline and plugin. | + | `conduit_pipeline_execution_duration_seconds` | Histogram | Amount of time records spent in a pipeline. | + | `conduit_connector_execution_duration_seconds` | Histogram | Amount of time spent reading or writing records per pipeline, plugin and connector type (source, destination). | + | `conduit_processor_execution_duration_seconds` | Histogram | Amount of time spent on processing records per pipeline and processor. | + | `conduit_dlq_execution_duration_seconds` | Histogram | Amount of time spent writing records to DLQ connector per pipeline and plugin. | + | `conduit_inspector_sessions` | Gauge | Number of inspector sessions by ID of pipeline component (connector or processor) | + + *We calculate bytes based on the JSON representation of the record payload and key. - **Go runtime metrics**: The default metrics exposed by Prometheus' official Go diff --git a/pkg/connector/instance.go b/pkg/connector/instance.go index 9abacd44b..2dfa7e334 100644 --- a/pkg/connector/instance.go +++ b/pkg/connector/instance.go @@ -104,7 +104,7 @@ func (i *Instance) Init(logger log.CtxLogger, persister *Persister) { // Inspect returns an inspector.Session which exposes the records // coming into or out of this connector (depending on the connector type). func (i *Instance) Inspect(ctx context.Context) *inspector.Session { - return i.inspector.NewSession(ctx) + return i.inspector.NewSession(ctx, i.ID) } // Connector fetches a new plugin dispenser and returns a connector that can be diff --git a/pkg/foundation/metrics/measure/measure.go b/pkg/foundation/metrics/measure/measure.go index ad8099415..f627d68ac 100644 --- a/pkg/foundation/metrics/measure/measure.go +++ b/pkg/foundation/metrics/measure/measure.go @@ -34,6 +34,11 @@ var ( ProcessorsGauge = metrics.NewLabeledGauge("conduit_processors", "Number of processors by type.", []string{"type"}) + InspectorsGauge = metrics.NewLabeledGauge( + "conduit_inspector_sessions", + "Number of inspector sessions by ID of pipeline component (connector or processor)", + []string{"component_id"}, + ) ConnectorBytesHistogram = metrics.NewLabeledHistogram("conduit_connector_bytes", "Number of bytes a connector processed by pipeline name, plugin and type (source, destination).", diff --git a/pkg/inspector/inspector.go b/pkg/inspector/inspector.go index 8d0c20ff1..2cdfa78bb 100644 --- a/pkg/inspector/inspector.go +++ b/pkg/inspector/inspector.go @@ -19,6 +19,7 @@ import ( "sync" "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/foundation/metrics/measure" "github.com/conduitio/conduit/pkg/record" "github.com/google/uuid" ) @@ -114,7 +115,9 @@ func (i *Inspector) Send(ctx context.Context, r record.Record) { } } -func (i *Inspector) NewSession(ctx context.Context) *Session { +// NewSession creates a new session in given inspector. +// componentID is the ID of the component being inspected (connector or processor). +func (i *Inspector) NewSession(ctx context.Context, componentID string) *Session { id := uuid.NewString() s := &Session{ C: make(chan record.Record, i.bufferSize), @@ -122,9 +125,12 @@ func (i *Inspector) NewSession(ctx context.Context) *Session { logger: i.logger.WithComponent("inspector.Session"), onClose: func() { i.remove(id) + measure.InspectorsGauge.WithValues(componentID).Dec() }, once: &sync.Once{}, } + measure.InspectorsGauge.WithValues(componentID).Inc() + go func() { <-ctx.Done() s.logger. diff --git a/pkg/inspector/inspector_benchmark_test.go b/pkg/inspector/inspector_benchmark_test.go index 78c41cb5f..8636570eb 100644 --- a/pkg/inspector/inspector_benchmark_test.go +++ b/pkg/inspector/inspector_benchmark_test.go @@ -24,7 +24,7 @@ import ( func BenchmarkInspector_SingleSession_Send(b *testing.B) { ins := New(log.Nop(), 10) - ins.NewSession(context.Background()) + ins.NewSession(context.Background(), "test-id") for i := 0; i < b.N; i++ { ins.Send(context.Background(), record.Record{Position: record.Position("test-pos")}) diff --git a/pkg/inspector/inspector_test.go b/pkg/inspector/inspector_test.go index 2b0a6929a..f28616ca4 100644 --- a/pkg/inspector/inspector_test.go +++ b/pkg/inspector/inspector_test.go @@ -34,7 +34,7 @@ func TestInspector_Send_NoSessions(*testing.T) { func TestInspector_Send_SingleSession(t *testing.T) { underTest := New(log.Nop(), 10) - s := underTest.NewSession(context.Background()) + s := underTest.NewSession(context.Background(), "test-id") r := record.Record{ Position: record.Position("test-pos"), @@ -47,8 +47,8 @@ func TestInspector_Send_MultipleSessions(t *testing.T) { is := is.New(t) underTest := New(log.Nop(), 10) - s1 := underTest.NewSession(context.Background()) - s2 := underTest.NewSession(context.Background()) + s1 := underTest.NewSession(context.Background(), "test-id") + s2 := underTest.NewSession(context.Background(), "test-id") r := record.Record{ Position: record.Position("test-pos"), @@ -62,7 +62,7 @@ func TestInspector_Send_SessionClosed(t *testing.T) { is := is.New(t) underTest := New(log.Nop(), 10) - s := underTest.NewSession(context.Background()) + s := underTest.NewSession(context.Background(), "test-id") r := record.Record{ Position: record.Position("test-pos"), @@ -83,7 +83,7 @@ func TestInspector_Close(t *testing.T) { is := is.New(t) underTest := New(log.Nop(), 10) - s := underTest.NewSession(context.Background()) + s := underTest.NewSession(context.Background(), "test-id") underTest.Close() _, ok := <-s.C @@ -95,7 +95,7 @@ func TestInspector_Send_SessionCtxCanceled(t *testing.T) { underTest := New(log.Nop(), 10) ctx, cancel := context.WithCancel(context.Background()) - s := underTest.NewSession(ctx) + s := underTest.NewSession(ctx, "test-id") r := record.Record{ Position: record.Position("test-pos"), @@ -120,7 +120,7 @@ func TestInspector_Send_SlowConsumer(t *testing.T) { bufferSize := 10 underTest := New(log.Nop(), bufferSize) - s := underTest.NewSession(context.Background()) + s := underTest.NewSession(context.Background(), "test-id") for i := 0; i < bufferSize+1; i++ { s.send( diff --git a/pkg/orchestrator/processors.go b/pkg/orchestrator/processors.go index 069758634..311cad05d 100644 --- a/pkg/orchestrator/processors.go +++ b/pkg/orchestrator/processors.go @@ -109,7 +109,7 @@ func (p *ProcessorOrchestrator) InspectIn( return nil, err } - return proc.Processor.InspectIn(ctx), nil + return proc.Processor.InspectIn(ctx, proc.ID), nil } func (p *ProcessorOrchestrator) InspectOut( @@ -121,7 +121,7 @@ func (p *ProcessorOrchestrator) InspectOut( return nil, err } - return proc.Processor.InspectOut(ctx), nil + return proc.Processor.InspectOut(ctx, proc.ID), nil } func (p *ProcessorOrchestrator) Get(ctx context.Context, id string) (*processor.Instance, error) { diff --git a/pkg/processor/instance.go b/pkg/processor/instance.go index 96246e940..2ec16f864 100644 --- a/pkg/processor/instance.go +++ b/pkg/processor/instance.go @@ -49,9 +49,9 @@ type Interface interface { Process(ctx context.Context, record record.Record) (record.Record, error) // InspectIn starts an inspection session for input records for this processor. - InspectIn(ctx context.Context) *inspector.Session + InspectIn(ctx context.Context, id string) *inspector.Session // InspectOut starts an inspection session for output records for this processor. - InspectOut(ctx context.Context) *inspector.Session + InspectOut(ctx context.Context, id string) *inspector.Session // Close closes this processor and releases any resources // which may have been used by it. diff --git a/pkg/processor/mock/processor.go b/pkg/processor/mock/processor.go index 966693850..6a0499dc1 100644 --- a/pkg/processor/mock/processor.go +++ b/pkg/processor/mock/processor.go @@ -49,31 +49,31 @@ func (mr *ProcessorMockRecorder) Close() *gomock.Call { } // InspectIn mocks base method. -func (m *Processor) InspectIn(arg0 context.Context) *inspector.Session { +func (m *Processor) InspectIn(arg0 context.Context, arg1 string) *inspector.Session { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "InspectIn", arg0) + ret := m.ctrl.Call(m, "InspectIn", arg0, arg1) ret0, _ := ret[0].(*inspector.Session) return ret0 } // InspectIn indicates an expected call of InspectIn. -func (mr *ProcessorMockRecorder) InspectIn(arg0 interface{}) *gomock.Call { +func (mr *ProcessorMockRecorder) InspectIn(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InspectIn", reflect.TypeOf((*Processor)(nil).InspectIn), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InspectIn", reflect.TypeOf((*Processor)(nil).InspectIn), arg0, arg1) } // InspectOut mocks base method. -func (m *Processor) InspectOut(arg0 context.Context) *inspector.Session { +func (m *Processor) InspectOut(arg0 context.Context, arg1 string) *inspector.Session { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "InspectOut", arg0) + ret := m.ctrl.Call(m, "InspectOut", arg0, arg1) ret0, _ := ret[0].(*inspector.Session) return ret0 } // InspectOut indicates an expected call of InspectOut. -func (mr *ProcessorMockRecorder) InspectOut(arg0 interface{}) *gomock.Call { +func (mr *ProcessorMockRecorder) InspectOut(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InspectOut", reflect.TypeOf((*Processor)(nil).InspectOut), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InspectOut", reflect.TypeOf((*Processor)(nil).InspectOut), arg0, arg1) } // Process mocks base method. diff --git a/pkg/processor/procbuiltin/func_wrapper.go b/pkg/processor/procbuiltin/func_wrapper.go index 9de79610e..18a870ecd 100644 --- a/pkg/processor/procbuiltin/func_wrapper.go +++ b/pkg/processor/procbuiltin/func_wrapper.go @@ -55,12 +55,12 @@ func (f FuncWrapper) Process(ctx context.Context, inRec record.Record) (record.R return outRec, nil } -func (f FuncWrapper) InspectIn(ctx context.Context) *inspector.Session { - return f.inInsp.NewSession(ctx) +func (f FuncWrapper) InspectIn(ctx context.Context, id string) *inspector.Session { + return f.inInsp.NewSession(ctx, id) } -func (f FuncWrapper) InspectOut(ctx context.Context) *inspector.Session { - return f.outInsp.NewSession(ctx) +func (f FuncWrapper) InspectOut(ctx context.Context, id string) *inspector.Session { + return f.outInsp.NewSession(ctx, id) } func (f FuncWrapper) Close() { diff --git a/pkg/processor/procbuiltin/func_wrapper_test.go b/pkg/processor/procbuiltin/func_wrapper_test.go index eee3f07b5..b425e6609 100644 --- a/pkg/processor/procbuiltin/func_wrapper_test.go +++ b/pkg/processor/procbuiltin/func_wrapper_test.go @@ -56,7 +56,7 @@ func TestFuncWrapper_InspectIn(t *testing.T) { return record.Record{}, tc.err }) - session := underTest.InspectIn(ctx) + session := underTest.InspectIn(ctx, "test-id") _, _ = underTest.Process(ctx, wantIn) gotIn, got, err := cchan.ChanOut[record.Record](session.C).RecvTimeout(ctx, 100*time.Millisecond) @@ -82,7 +82,7 @@ func TestFuncWrapper_InspectOut_Ok(t *testing.T) { return wantOut, nil }) - session := underTest.InspectOut(ctx) + session := underTest.InspectOut(ctx, "test-id") _, _ = underTest.Process(ctx, record.Record{}) gotOut, got, err := cchan.ChanOut[record.Record](session.C).RecvTimeout(ctx, 100*time.Millisecond) @@ -106,7 +106,7 @@ func TestFuncWrapper_InspectOut_ProcessingFailed(t *testing.T) { return wantOut, cerrors.New("shouldn't happen") }) - session := underTest.InspectOut(ctx) + session := underTest.InspectOut(ctx, "test-id") _, _ = underTest.Process(ctx, record.Record{}) _, _, err := cchan.ChanOut[record.Record](session.C).RecvTimeout(ctx, 100*time.Millisecond) @@ -122,8 +122,8 @@ func TestFuncWrapper_Close(t *testing.T) { return record.Record{}, nil }) - in := underTest.InspectIn(ctx) - out := underTest.InspectOut(ctx) + in := underTest.InspectIn(ctx, "test-id") + out := underTest.InspectOut(ctx, "test-id") underTest.Close() // incoming records session should be closed diff --git a/pkg/processor/procjs/processor.go b/pkg/processor/procjs/processor.go index e91c9c956..1281eb104 100644 --- a/pkg/processor/procjs/processor.go +++ b/pkg/processor/procjs/processor.go @@ -188,12 +188,12 @@ func (p *Processor) Process(ctx context.Context, in record.Record) (record.Recor return out, nil } -func (p *Processor) InspectIn(ctx context.Context) *inspector.Session { - return p.inInsp.NewSession(ctx) +func (p *Processor) InspectIn(ctx context.Context, id string) *inspector.Session { + return p.inInsp.NewSession(ctx, id) } -func (p *Processor) InspectOut(ctx context.Context) *inspector.Session { - return p.outInsp.NewSession(ctx) +func (p *Processor) InspectOut(ctx context.Context, id string) *inspector.Session { + return p.outInsp.NewSession(ctx, id) } func (p *Processor) Close() { diff --git a/pkg/processor/procjs/processor_test.go b/pkg/processor/procjs/processor_test.go index 2c935e48a..ee8b09852 100644 --- a/pkg/processor/procjs/processor_test.go +++ b/pkg/processor/procjs/processor_test.go @@ -407,8 +407,8 @@ func TestJSProcessor_Inspect(t *testing.T) { underTest, err := New(src, zerolog.Nop()) is.NoErr(err) // expected no error when creating the JS processor - in := underTest.InspectIn(ctx) - out := underTest.InspectOut(ctx) + in := underTest.InspectIn(ctx, "test-id") + out := underTest.InspectOut(ctx, "test-id") recIn := record.Record{ Position: record.Position("test-pos"), @@ -443,8 +443,8 @@ func TestJSProcessor_Close(t *testing.T) { underTest, err := New(src, zerolog.Nop()) is.NoErr(err) // expected no error when creating the JS processor - in := underTest.InspectIn(ctx) - out := underTest.InspectOut(ctx) + in := underTest.InspectIn(ctx, "test-id") + out := underTest.InspectOut(ctx, "test-id") underTest.Close() // incoming records session should be closed diff --git a/pkg/web/api/connector_v1_test.go b/pkg/web/api/connector_v1_test.go index 68057e177..f7d083039 100644 --- a/pkg/web/api/connector_v1_test.go +++ b/pkg/web/api/connector_v1_test.go @@ -221,7 +221,7 @@ func TestConnectorAPIv1_InspectConnector_SendRecord(t *testing.T) { assert.Ok(t, err) ins := inspector.New(log.Nop(), 10) - session := ins.NewSession(ctx) + session := ins.NewSession(ctx, "test-id") csMock.EXPECT(). Inspect(ctx, id). @@ -252,7 +252,7 @@ func TestConnectorAPIv1_InspectConnector_SendErr(t *testing.T) { id := uuid.NewString() ins := inspector.New(log.Nop(), 10) - session := ins.NewSession(ctx) + session := ins.NewSession(ctx, "test-id") csMock.EXPECT(). Inspect(ctx, id). diff --git a/pkg/web/api/processor_v1_test.go b/pkg/web/api/processor_v1_test.go index f3080a71c..ef26864cd 100644 --- a/pkg/web/api/processor_v1_test.go +++ b/pkg/web/api/processor_v1_test.go @@ -432,7 +432,7 @@ func TestProcessorAPIv1_InspectIn_SendRecord(t *testing.T) { assert.Ok(t, err) ins := inspector.New(log.Nop(), 10) - session := ins.NewSession(ctx) + session := ins.NewSession(ctx, "test-id") orchestrator.EXPECT(). InspectIn(ctx, id). @@ -463,7 +463,7 @@ func TestProcessorAPIv1_InspectIn_SendErr(t *testing.T) { id := uuid.NewString() ins := inspector.New(log.Nop(), 10) - session := ins.NewSession(ctx) + session := ins.NewSession(ctx, "test-id") orchestrator.EXPECT(). InspectIn(ctx, id).