Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream Inspector: Add metrics #971

Merged
merged 13 commits into from
Apr 4, 2023
28 changes: 15 additions & 13 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,21 @@ locally, you can get metrics if you run `curl localhost:8080/metrics`.
in [`measure.go`](https://github.com/ConduitIO/conduit/blob/main/pkg/foundation/metrics/measure/measure.go).
Those are:

| Pipeline name | Type | Description |
|------------------------------------------------|-----------|----------------------------------------------------------------------------------------------------------------|
| `conduit_pipelines` | Gauge | Number of pipelines by status. |
| `conduit_connectors` | Gauge | Number of connectors by type (source, destination). |
| `conduit_processors` | Gauge | Number of processors by name and type. |
| `conduit_connector_bytes` | Histogram | Number of bytes* a connector processed by pipeline name, plugin and type (source, destination). |
| `conduit_dlq_bytes` | Histogram | Number of bytes* a DLQ connector processed per pipeline and plugin. |
| `conduit_pipeline_execution_duration_seconds` | Histogram | Amount of time records spent in a pipeline. |
| `conduit_connector_execution_duration_seconds` | Histogram | Amount of time spent reading or writing records per pipeline, plugin and connector type (source, destination). |
| `conduit_processor_execution_duration_seconds` | Histogram | Amount of time spent on processing records per pipeline and processor. |
| `conduit_dlq_execution_duration_seconds` | Histogram | Amount of time spent writing records to DLQ connector per pipeline and plugin. |

\*We calculate bytes based on the JSON representation of the record payload
| Pipeline name | Type | Description |
|------------------------------------------------|-----------|----------------------------------------------------------------------------------------------------------------|
| `conduit_pipelines` | Gauge | Number of pipelines by status. |
| `conduit_connectors` | Gauge | Number of connectors by type (source, destination). |
| `conduit_processors` | Gauge | Number of processors by name and type. |
| `conduit_connector_bytes` | Histogram | Number of bytes* a connector processed by pipeline name, plugin and type (source, destination). |
| `conduit_dlq_bytes` | Histogram | Number of bytes* a DLQ connector processed per pipeline and plugin. |
| `conduit_pipeline_execution_duration_seconds` | Histogram | Amount of time records spent in a pipeline. |
| `conduit_connector_execution_duration_seconds` | Histogram | Amount of time spent reading or writing records per pipeline, plugin and connector type (source, destination). |
| `conduit_processor_execution_duration_seconds` | Histogram | Amount of time spent on processing records per pipeline and processor. |
| `conduit_dlq_execution_duration_seconds` | Histogram | Amount of time spent writing records to DLQ connector per pipeline and plugin. |
| `conduit_dlq_execution_duration_seconds` | Histogram | Amount of time spent writing records to DLQ connector per pipeline and plugin. |
| `conduit_inspector_sessions` | Gauge | Number of inspector sessions by ID of pipeline component (connector or processor) |

*We calculate bytes based on the JSON representation of the record payload
and key.

- **Go runtime metrics**: The default metrics exposed by Prometheus' official Go
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (i *Instance) Init(logger log.CtxLogger, persister *Persister) {
// Inspect returns an inspector.Session which exposes the records
// coming into or out of this connector (depending on the connector type).
func (i *Instance) Inspect(ctx context.Context) *inspector.Session {
return i.inspector.NewSession(ctx)
return i.inspector.NewSession(ctx, i.ID)
}

// Connector fetches a new plugin dispenser and returns a connector that can be
Expand Down
5 changes: 5 additions & 0 deletions pkg/foundation/metrics/measure/measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ var (
ProcessorsGauge = metrics.NewLabeledGauge("conduit_processors",
"Number of processors by type.",
[]string{"type"})
InspectorsGauge = metrics.NewLabeledGauge(
"conduit_inspector_sessions",
"Number of inspector sessions by ID of pipeline component (connector or processor)",
[]string{"component_id"},
)

ConnectorBytesHistogram = metrics.NewLabeledHistogram("conduit_connector_bytes",
"Number of bytes a connector processed by pipeline name, plugin and type (source, destination).",
Expand Down
8 changes: 7 additions & 1 deletion pkg/inspector/inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync"

"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/foundation/metrics/measure"
"github.com/conduitio/conduit/pkg/record"
"github.com/google/uuid"
)
Expand Down Expand Up @@ -114,17 +115,22 @@ func (i *Inspector) Send(ctx context.Context, r record.Record) {
}
}

func (i *Inspector) NewSession(ctx context.Context) *Session {
// NewSession creates a new session in given inspector.
// componentID is the ID of the component being inspected (connector or processor).
func (i *Inspector) NewSession(ctx context.Context, componentID string) *Session {
id := uuid.NewString()
s := &Session{
C: make(chan record.Record, i.bufferSize),
id: id,
logger: i.logger.WithComponent("inspector.Session"),
onClose: func() {
i.remove(id)
measure.InspectorsGauge.WithValues(componentID).Dec()
},
once: &sync.Once{},
}
measure.InspectorsGauge.WithValues(componentID).Inc()

go func() {
<-ctx.Done()
s.logger.
Expand Down
2 changes: 1 addition & 1 deletion pkg/inspector/inspector_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

func BenchmarkInspector_SingleSession_Send(b *testing.B) {
ins := New(log.Nop(), 10)
ins.NewSession(context.Background())
ins.NewSession(context.Background(), "test-id")

for i := 0; i < b.N; i++ {
ins.Send(context.Background(), record.Record{Position: record.Position("test-pos")})
Expand Down
14 changes: 7 additions & 7 deletions pkg/inspector/inspector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestInspector_Send_NoSessions(*testing.T) {

func TestInspector_Send_SingleSession(t *testing.T) {
underTest := New(log.Nop(), 10)
s := underTest.NewSession(context.Background())
s := underTest.NewSession(context.Background(), "test-id")

r := record.Record{
Position: record.Position("test-pos"),
Expand All @@ -47,8 +47,8 @@ func TestInspector_Send_MultipleSessions(t *testing.T) {
is := is.New(t)

underTest := New(log.Nop(), 10)
s1 := underTest.NewSession(context.Background())
s2 := underTest.NewSession(context.Background())
s1 := underTest.NewSession(context.Background(), "test-id")
s2 := underTest.NewSession(context.Background(), "test-id")

r := record.Record{
Position: record.Position("test-pos"),
Expand All @@ -62,7 +62,7 @@ func TestInspector_Send_SessionClosed(t *testing.T) {
is := is.New(t)

underTest := New(log.Nop(), 10)
s := underTest.NewSession(context.Background())
s := underTest.NewSession(context.Background(), "test-id")

r := record.Record{
Position: record.Position("test-pos"),
Expand All @@ -83,7 +83,7 @@ func TestInspector_Close(t *testing.T) {
is := is.New(t)

underTest := New(log.Nop(), 10)
s := underTest.NewSession(context.Background())
s := underTest.NewSession(context.Background(), "test-id")

underTest.Close()
_, ok := <-s.C
Expand All @@ -95,7 +95,7 @@ func TestInspector_Send_SessionCtxCanceled(t *testing.T) {

underTest := New(log.Nop(), 10)
ctx, cancel := context.WithCancel(context.Background())
s := underTest.NewSession(ctx)
s := underTest.NewSession(ctx, "test-id")

r := record.Record{
Position: record.Position("test-pos"),
Expand All @@ -120,7 +120,7 @@ func TestInspector_Send_SlowConsumer(t *testing.T) {

bufferSize := 10
underTest := New(log.Nop(), bufferSize)
s := underTest.NewSession(context.Background())
s := underTest.NewSession(context.Background(), "test-id")

for i := 0; i < bufferSize+1; i++ {
s.send(
Expand Down
4 changes: 2 additions & 2 deletions pkg/orchestrator/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (p *ProcessorOrchestrator) InspectIn(
return nil, err
}

return proc.Processor.InspectIn(ctx), nil
return proc.Processor.InspectIn(ctx, proc.ID), nil
}

func (p *ProcessorOrchestrator) InspectOut(
Expand All @@ -121,7 +121,7 @@ func (p *ProcessorOrchestrator) InspectOut(
return nil, err
}

return proc.Processor.InspectOut(ctx), nil
return proc.Processor.InspectOut(ctx, proc.ID), nil
}

func (p *ProcessorOrchestrator) Get(ctx context.Context, id string) (*processor.Instance, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/processor/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ type Interface interface {
Process(ctx context.Context, record record.Record) (record.Record, error)

// InspectIn starts an inspection session for input records for this processor.
InspectIn(ctx context.Context) *inspector.Session
InspectIn(ctx context.Context, id string) *inspector.Session
// InspectOut starts an inspection session for output records for this processor.
InspectOut(ctx context.Context) *inspector.Session
InspectOut(ctx context.Context, id string) *inspector.Session

// Close closes this processor and releases any resources
// which may have been used by it.
Expand Down
16 changes: 8 additions & 8 deletions pkg/processor/mock/processor.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions pkg/processor/procbuiltin/func_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ func (f FuncWrapper) Process(ctx context.Context, inRec record.Record) (record.R
return outRec, nil
}

func (f FuncWrapper) InspectIn(ctx context.Context) *inspector.Session {
return f.inInsp.NewSession(ctx)
func (f FuncWrapper) InspectIn(ctx context.Context, id string) *inspector.Session {
return f.inInsp.NewSession(ctx, id)
}

func (f FuncWrapper) InspectOut(ctx context.Context) *inspector.Session {
return f.outInsp.NewSession(ctx)
func (f FuncWrapper) InspectOut(ctx context.Context, id string) *inspector.Session {
return f.outInsp.NewSession(ctx, id)
}

func (f FuncWrapper) Close() {
Expand Down
10 changes: 5 additions & 5 deletions pkg/processor/procbuiltin/func_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestFuncWrapper_InspectIn(t *testing.T) {
return record.Record{}, tc.err
})

session := underTest.InspectIn(ctx)
session := underTest.InspectIn(ctx, "test-id")
_, _ = underTest.Process(ctx, wantIn)

gotIn, got, err := cchan.ChanOut[record.Record](session.C).RecvTimeout(ctx, 100*time.Millisecond)
Expand All @@ -82,7 +82,7 @@ func TestFuncWrapper_InspectOut_Ok(t *testing.T) {
return wantOut, nil
})

session := underTest.InspectOut(ctx)
session := underTest.InspectOut(ctx, "test-id")
_, _ = underTest.Process(ctx, record.Record{})

gotOut, got, err := cchan.ChanOut[record.Record](session.C).RecvTimeout(ctx, 100*time.Millisecond)
Expand All @@ -106,7 +106,7 @@ func TestFuncWrapper_InspectOut_ProcessingFailed(t *testing.T) {
return wantOut, cerrors.New("shouldn't happen")
})

session := underTest.InspectOut(ctx)
session := underTest.InspectOut(ctx, "test-id")
_, _ = underTest.Process(ctx, record.Record{})

_, _, err := cchan.ChanOut[record.Record](session.C).RecvTimeout(ctx, 100*time.Millisecond)
Expand All @@ -122,8 +122,8 @@ func TestFuncWrapper_Close(t *testing.T) {
return record.Record{}, nil
})

in := underTest.InspectIn(ctx)
out := underTest.InspectOut(ctx)
in := underTest.InspectIn(ctx, "test-id")
out := underTest.InspectOut(ctx, "test-id")
underTest.Close()

// incoming records session should be closed
Expand Down
8 changes: 4 additions & 4 deletions pkg/processor/procjs/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,12 @@ func (p *Processor) Process(ctx context.Context, in record.Record) (record.Recor
return out, nil
}

func (p *Processor) InspectIn(ctx context.Context) *inspector.Session {
return p.inInsp.NewSession(ctx)
func (p *Processor) InspectIn(ctx context.Context, id string) *inspector.Session {
return p.inInsp.NewSession(ctx, id)
}

func (p *Processor) InspectOut(ctx context.Context) *inspector.Session {
return p.outInsp.NewSession(ctx)
func (p *Processor) InspectOut(ctx context.Context, id string) *inspector.Session {
return p.outInsp.NewSession(ctx, id)
}

func (p *Processor) Close() {
Expand Down
8 changes: 4 additions & 4 deletions pkg/processor/procjs/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,8 @@ func TestJSProcessor_Inspect(t *testing.T) {
underTest, err := New(src, zerolog.Nop())
is.NoErr(err) // expected no error when creating the JS processor

in := underTest.InspectIn(ctx)
out := underTest.InspectOut(ctx)
in := underTest.InspectIn(ctx, "test-id")
out := underTest.InspectOut(ctx, "test-id")

recIn := record.Record{
Position: record.Position("test-pos"),
Expand Down Expand Up @@ -443,8 +443,8 @@ func TestJSProcessor_Close(t *testing.T) {
underTest, err := New(src, zerolog.Nop())
is.NoErr(err) // expected no error when creating the JS processor

in := underTest.InspectIn(ctx)
out := underTest.InspectOut(ctx)
in := underTest.InspectIn(ctx, "test-id")
out := underTest.InspectOut(ctx, "test-id")
underTest.Close()

// incoming records session should be closed
Expand Down
4 changes: 2 additions & 2 deletions pkg/web/api/connector_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func TestConnectorAPIv1_InspectConnector_SendRecord(t *testing.T) {
assert.Ok(t, err)

ins := inspector.New(log.Nop(), 10)
session := ins.NewSession(ctx)
session := ins.NewSession(ctx, "test-id")

csMock.EXPECT().
Inspect(ctx, id).
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestConnectorAPIv1_InspectConnector_SendErr(t *testing.T) {
id := uuid.NewString()

ins := inspector.New(log.Nop(), 10)
session := ins.NewSession(ctx)
session := ins.NewSession(ctx, "test-id")

csMock.EXPECT().
Inspect(ctx, id).
Expand Down
4 changes: 2 additions & 2 deletions pkg/web/api/processor_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func TestProcessorAPIv1_InspectIn_SendRecord(t *testing.T) {
assert.Ok(t, err)

ins := inspector.New(log.Nop(), 10)
session := ins.NewSession(ctx)
session := ins.NewSession(ctx, "test-id")

orchestrator.EXPECT().
InspectIn(ctx, id).
Expand Down Expand Up @@ -463,7 +463,7 @@ func TestProcessorAPIv1_InspectIn_SendErr(t *testing.T) {
id := uuid.NewString()

ins := inspector.New(log.Nop(), 10)
session := ins.NewSession(ctx)
session := ins.NewSession(ctx, "test-id")

orchestrator.EXPECT().
InspectIn(ctx, id).
Expand Down