Skip to content

Commit

Permalink
Stream Inspector: Add metrics (#971)
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso authored Apr 4, 2023
1 parent 166dfe9 commit 0f2ead2
Show file tree
Hide file tree
Showing 15 changed files with 68 additions and 56 deletions.
27 changes: 14 additions & 13 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,20 @@ locally, you can get metrics if you run `curl localhost:8080/metrics`.
in [`measure.go`](https://github.com/ConduitIO/conduit/blob/main/pkg/foundation/metrics/measure/measure.go).
Those are:

| Pipeline name | Type | Description |
|------------------------------------------------|-----------|----------------------------------------------------------------------------------------------------------------|
| `conduit_pipelines` | Gauge | Number of pipelines by status. |
| `conduit_connectors` | Gauge | Number of connectors by type (source, destination). |
| `conduit_processors` | Gauge | Number of processors by name and type. |
| `conduit_connector_bytes` | Histogram | Number of bytes* a connector processed by pipeline name, plugin and type (source, destination). |
| `conduit_dlq_bytes` | Histogram | Number of bytes* a DLQ connector processed per pipeline and plugin. |
| `conduit_pipeline_execution_duration_seconds` | Histogram | Amount of time records spent in a pipeline. |
| `conduit_connector_execution_duration_seconds` | Histogram | Amount of time spent reading or writing records per pipeline, plugin and connector type (source, destination). |
| `conduit_processor_execution_duration_seconds` | Histogram | Amount of time spent on processing records per pipeline and processor. |
| `conduit_dlq_execution_duration_seconds` | Histogram | Amount of time spent writing records to DLQ connector per pipeline and plugin. |

\*We calculate bytes based on the JSON representation of the record payload
| Pipeline name | Type | Description |
|------------------------------------------------|-----------|----------------------------------------------------------------------------------------------------------------|
| `conduit_pipelines` | Gauge | Number of pipelines by status. |
| `conduit_connectors` | Gauge | Number of connectors by type (source, destination). |
| `conduit_processors` | Gauge | Number of processors by name and type. |
| `conduit_connector_bytes` | Histogram | Number of bytes* a connector processed by pipeline name, plugin and type (source, destination). |
| `conduit_dlq_bytes` | Histogram | Number of bytes* a DLQ connector processed per pipeline and plugin. |
| `conduit_pipeline_execution_duration_seconds` | Histogram | Amount of time records spent in a pipeline. |
| `conduit_connector_execution_duration_seconds` | Histogram | Amount of time spent reading or writing records per pipeline, plugin and connector type (source, destination). |
| `conduit_processor_execution_duration_seconds` | Histogram | Amount of time spent on processing records per pipeline and processor. |
| `conduit_dlq_execution_duration_seconds` | Histogram | Amount of time spent writing records to DLQ connector per pipeline and plugin. |
| `conduit_inspector_sessions` | Gauge | Number of inspector sessions by ID of pipeline component (connector or processor) |

*We calculate bytes based on the JSON representation of the record payload
and key.

- **Go runtime metrics**: The default metrics exposed by Prometheus' official Go
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (i *Instance) Init(logger log.CtxLogger, persister *Persister) {
// Inspect returns an inspector.Session which exposes the records
// coming into or out of this connector (depending on the connector type).
func (i *Instance) Inspect(ctx context.Context) *inspector.Session {
return i.inspector.NewSession(ctx)
return i.inspector.NewSession(ctx, i.ID)
}

// Connector fetches a new plugin dispenser and returns a connector that can be
Expand Down
5 changes: 5 additions & 0 deletions pkg/foundation/metrics/measure/measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ var (
ProcessorsGauge = metrics.NewLabeledGauge("conduit_processors",
"Number of processors by type.",
[]string{"type"})
InspectorsGauge = metrics.NewLabeledGauge(
"conduit_inspector_sessions",
"Number of inspector sessions by ID of pipeline component (connector or processor)",
[]string{"component_id"},
)

ConnectorBytesHistogram = metrics.NewLabeledHistogram("conduit_connector_bytes",
"Number of bytes a connector processed by pipeline name, plugin and type (source, destination).",
Expand Down
8 changes: 7 additions & 1 deletion pkg/inspector/inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync"

"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/foundation/metrics/measure"
"github.com/conduitio/conduit/pkg/record"
"github.com/google/uuid"
)
Expand Down Expand Up @@ -114,17 +115,22 @@ func (i *Inspector) Send(ctx context.Context, r record.Record) {
}
}

func (i *Inspector) NewSession(ctx context.Context) *Session {
// NewSession creates a new session in given inspector.
// componentID is the ID of the component being inspected (connector or processor).
func (i *Inspector) NewSession(ctx context.Context, componentID string) *Session {
id := uuid.NewString()
s := &Session{
C: make(chan record.Record, i.bufferSize),
id: id,
logger: i.logger.WithComponent("inspector.Session"),
onClose: func() {
i.remove(id)
measure.InspectorsGauge.WithValues(componentID).Dec()
},
once: &sync.Once{},
}
measure.InspectorsGauge.WithValues(componentID).Inc()

go func() {
<-ctx.Done()
s.logger.
Expand Down
2 changes: 1 addition & 1 deletion pkg/inspector/inspector_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

func BenchmarkInspector_SingleSession_Send(b *testing.B) {
ins := New(log.Nop(), 10)
ins.NewSession(context.Background())
ins.NewSession(context.Background(), "test-id")

for i := 0; i < b.N; i++ {
ins.Send(context.Background(), record.Record{Position: record.Position("test-pos")})
Expand Down
14 changes: 7 additions & 7 deletions pkg/inspector/inspector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestInspector_Send_NoSessions(*testing.T) {

func TestInspector_Send_SingleSession(t *testing.T) {
underTest := New(log.Nop(), 10)
s := underTest.NewSession(context.Background())
s := underTest.NewSession(context.Background(), "test-id")

r := record.Record{
Position: record.Position("test-pos"),
Expand All @@ -47,8 +47,8 @@ func TestInspector_Send_MultipleSessions(t *testing.T) {
is := is.New(t)

underTest := New(log.Nop(), 10)
s1 := underTest.NewSession(context.Background())
s2 := underTest.NewSession(context.Background())
s1 := underTest.NewSession(context.Background(), "test-id")
s2 := underTest.NewSession(context.Background(), "test-id")

r := record.Record{
Position: record.Position("test-pos"),
Expand All @@ -62,7 +62,7 @@ func TestInspector_Send_SessionClosed(t *testing.T) {
is := is.New(t)

underTest := New(log.Nop(), 10)
s := underTest.NewSession(context.Background())
s := underTest.NewSession(context.Background(), "test-id")

r := record.Record{
Position: record.Position("test-pos"),
Expand All @@ -83,7 +83,7 @@ func TestInspector_Close(t *testing.T) {
is := is.New(t)

underTest := New(log.Nop(), 10)
s := underTest.NewSession(context.Background())
s := underTest.NewSession(context.Background(), "test-id")

underTest.Close()
_, ok := <-s.C
Expand All @@ -95,7 +95,7 @@ func TestInspector_Send_SessionCtxCanceled(t *testing.T) {

underTest := New(log.Nop(), 10)
ctx, cancel := context.WithCancel(context.Background())
s := underTest.NewSession(ctx)
s := underTest.NewSession(ctx, "test-id")

r := record.Record{
Position: record.Position("test-pos"),
Expand All @@ -120,7 +120,7 @@ func TestInspector_Send_SlowConsumer(t *testing.T) {

bufferSize := 10
underTest := New(log.Nop(), bufferSize)
s := underTest.NewSession(context.Background())
s := underTest.NewSession(context.Background(), "test-id")

for i := 0; i < bufferSize+1; i++ {
s.send(
Expand Down
4 changes: 2 additions & 2 deletions pkg/orchestrator/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (p *ProcessorOrchestrator) InspectIn(
return nil, err
}

return proc.Processor.InspectIn(ctx), nil
return proc.Processor.InspectIn(ctx, proc.ID), nil
}

func (p *ProcessorOrchestrator) InspectOut(
Expand All @@ -121,7 +121,7 @@ func (p *ProcessorOrchestrator) InspectOut(
return nil, err
}

return proc.Processor.InspectOut(ctx), nil
return proc.Processor.InspectOut(ctx, proc.ID), nil
}

func (p *ProcessorOrchestrator) Get(ctx context.Context, id string) (*processor.Instance, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/processor/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ type Interface interface {
Process(ctx context.Context, record record.Record) (record.Record, error)

// InspectIn starts an inspection session for input records for this processor.
InspectIn(ctx context.Context) *inspector.Session
InspectIn(ctx context.Context, id string) *inspector.Session
// InspectOut starts an inspection session for output records for this processor.
InspectOut(ctx context.Context) *inspector.Session
InspectOut(ctx context.Context, id string) *inspector.Session

// Close closes this processor and releases any resources
// which may have been used by it.
Expand Down
16 changes: 8 additions & 8 deletions pkg/processor/mock/processor.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions pkg/processor/procbuiltin/func_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ func (f FuncWrapper) Process(ctx context.Context, inRec record.Record) (record.R
return outRec, nil
}

func (f FuncWrapper) InspectIn(ctx context.Context) *inspector.Session {
return f.inInsp.NewSession(ctx)
func (f FuncWrapper) InspectIn(ctx context.Context, id string) *inspector.Session {
return f.inInsp.NewSession(ctx, id)
}

func (f FuncWrapper) InspectOut(ctx context.Context) *inspector.Session {
return f.outInsp.NewSession(ctx)
func (f FuncWrapper) InspectOut(ctx context.Context, id string) *inspector.Session {
return f.outInsp.NewSession(ctx, id)
}

func (f FuncWrapper) Close() {
Expand Down
10 changes: 5 additions & 5 deletions pkg/processor/procbuiltin/func_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestFuncWrapper_InspectIn(t *testing.T) {
return record.Record{}, tc.err
})

session := underTest.InspectIn(ctx)
session := underTest.InspectIn(ctx, "test-id")
_, _ = underTest.Process(ctx, wantIn)

gotIn, got, err := cchan.ChanOut[record.Record](session.C).RecvTimeout(ctx, 100*time.Millisecond)
Expand All @@ -82,7 +82,7 @@ func TestFuncWrapper_InspectOut_Ok(t *testing.T) {
return wantOut, nil
})

session := underTest.InspectOut(ctx)
session := underTest.InspectOut(ctx, "test-id")
_, _ = underTest.Process(ctx, record.Record{})

gotOut, got, err := cchan.ChanOut[record.Record](session.C).RecvTimeout(ctx, 100*time.Millisecond)
Expand All @@ -106,7 +106,7 @@ func TestFuncWrapper_InspectOut_ProcessingFailed(t *testing.T) {
return wantOut, cerrors.New("shouldn't happen")
})

session := underTest.InspectOut(ctx)
session := underTest.InspectOut(ctx, "test-id")
_, _ = underTest.Process(ctx, record.Record{})

_, _, err := cchan.ChanOut[record.Record](session.C).RecvTimeout(ctx, 100*time.Millisecond)
Expand All @@ -122,8 +122,8 @@ func TestFuncWrapper_Close(t *testing.T) {
return record.Record{}, nil
})

in := underTest.InspectIn(ctx)
out := underTest.InspectOut(ctx)
in := underTest.InspectIn(ctx, "test-id")
out := underTest.InspectOut(ctx, "test-id")
underTest.Close()

// incoming records session should be closed
Expand Down
8 changes: 4 additions & 4 deletions pkg/processor/procjs/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,12 @@ func (p *Processor) Process(ctx context.Context, in record.Record) (record.Recor
return out, nil
}

func (p *Processor) InspectIn(ctx context.Context) *inspector.Session {
return p.inInsp.NewSession(ctx)
func (p *Processor) InspectIn(ctx context.Context, id string) *inspector.Session {
return p.inInsp.NewSession(ctx, id)
}

func (p *Processor) InspectOut(ctx context.Context) *inspector.Session {
return p.outInsp.NewSession(ctx)
func (p *Processor) InspectOut(ctx context.Context, id string) *inspector.Session {
return p.outInsp.NewSession(ctx, id)
}

func (p *Processor) Close() {
Expand Down
8 changes: 4 additions & 4 deletions pkg/processor/procjs/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,8 @@ func TestJSProcessor_Inspect(t *testing.T) {
underTest, err := New(src, zerolog.Nop())
is.NoErr(err) // expected no error when creating the JS processor

in := underTest.InspectIn(ctx)
out := underTest.InspectOut(ctx)
in := underTest.InspectIn(ctx, "test-id")
out := underTest.InspectOut(ctx, "test-id")

recIn := record.Record{
Position: record.Position("test-pos"),
Expand Down Expand Up @@ -443,8 +443,8 @@ func TestJSProcessor_Close(t *testing.T) {
underTest, err := New(src, zerolog.Nop())
is.NoErr(err) // expected no error when creating the JS processor

in := underTest.InspectIn(ctx)
out := underTest.InspectOut(ctx)
in := underTest.InspectIn(ctx, "test-id")
out := underTest.InspectOut(ctx, "test-id")
underTest.Close()

// incoming records session should be closed
Expand Down
4 changes: 2 additions & 2 deletions pkg/web/api/connector_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func TestConnectorAPIv1_InspectConnector_SendRecord(t *testing.T) {
assert.Ok(t, err)

ins := inspector.New(log.Nop(), 10)
session := ins.NewSession(ctx)
session := ins.NewSession(ctx, "test-id")

csMock.EXPECT().
Inspect(ctx, id).
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestConnectorAPIv1_InspectConnector_SendErr(t *testing.T) {
id := uuid.NewString()

ins := inspector.New(log.Nop(), 10)
session := ins.NewSession(ctx)
session := ins.NewSession(ctx, "test-id")

csMock.EXPECT().
Inspect(ctx, id).
Expand Down
4 changes: 2 additions & 2 deletions pkg/web/api/processor_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func TestProcessorAPIv1_InspectIn_SendRecord(t *testing.T) {
assert.Ok(t, err)

ins := inspector.New(log.Nop(), 10)
session := ins.NewSession(ctx)
session := ins.NewSession(ctx, "test-id")

orchestrator.EXPECT().
InspectIn(ctx, id).
Expand Down Expand Up @@ -463,7 +463,7 @@ func TestProcessorAPIv1_InspectIn_SendErr(t *testing.T) {
id := uuid.NewString()

ins := inspector.New(log.Nop(), 10)
session := ins.NewSession(ctx)
session := ins.NewSession(ctx, "test-id")

orchestrator.EXPECT().
InspectIn(ctx, id).
Expand Down

0 comments on commit 0f2ead2

Please sign in to comment.