Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: Stream inspector design #692

Merged
merged 27 commits into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions docs/design-documents/20221024-stream-inspector.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Stream Inspector

## Goals

Make troubleshooting pipelines easier by making it possible to inspect the data which is flowing through pipeline.

## Requirements

1. All pipeline components (accessible "from the outside", i.e. sources, processors and destinations) should be
inspectable.
* For v1, it's enough that destinations are inspectable.
2. A source inspector should show the records coming out of the respective source.
3. A destination inspector should show the records coming into the respective destination.
4. A processor has two inspectors: one for the incoming data and one for the resulting data (i.e. the transformed
records).
5. The inspector should provide the data about the component being inspected, from the time the inspection started.
Historical data is not required.
6. The inspector should have a minimal footprint on the resource usage (CPU, memory etc.)
7. It should be possible to specify which data is shown.
8. The inspector data should be available through:
* the HTTP API,
* the gRPC API and
* the UI.
9. Multiple users should be able to inspect the same pipeline component at the same time. Every user's session is
independent.
10. The inspector should stop publishing the data to a client, if the client appears to be idle/crashed.

## Implementation

Implementations will generally use one of two approaches: pull based and push based.

* In a **pull based** implementation, a client (HTTP client, gRPC client, UI) would be periodically checking for
inspection data.
* A client would need to store the "position" of the previous inspection, to get newer data.
* This would require inspection state management in Conduit for each session.
* Inspection data would come in with a delay.
* In a **push based** implementation, Conduit would be pushing the inspection data to a client (HTTP client, gRPC
client, UI).
* A client would not need to store the "position" of the previous inspection, to get newer data.
* Inspection state management in Conduit would be minimal.
* Inspection data would come in almost real-time.
* Inspecting the data may require additional tools (depends on the actual implementation)

From what we know so far, the push based approach has more advantages and is easier to work with, so that's the
approach chosen here. Concrete implementation options are discussed below.

For context: gRPC is the main API for Conduit. The HTTP API is generated using [grpc-gateway](https://github.com/grpc-ecosystem/grpc-gateway).

### API
For any given pipeline component, only one method is required, one which starts an inspection (i.e. sending of data to
the client).

Three methods are required in total:
1. One to inspect a connector (there's a single endpoint for source and destination connectors)
2. One to inspect records coming into a processor
3. One to inspect records coming out of a processor

### Implementation option 1: WebSockets
Conduit provides a streaming endpoint. The stream is exposed using the WebSockets API.

grpc-gateway doesn't support WebSockets (see [this](https://github.com/grpc-ecosystem/grpc-gateway/issues/168)). There's
an open-source proxy for it though, available [here](https://github.com/tmc/grpc-websocket-proxy/).

### Implementation option 2: Pure gRPC
Conduit provides a streaming endpoint. The stream is consumed as such, i.e. a gRPC endpoint. A JavaScript implementation
of gRPC for browser clients exists (called [grpc-web](https://github.com/grpc/grpc-web)). While that would mean no
changes in Conduit itself, it would require a lot of changes in the UI.

### Implementation option 3: Server-sent events
TBD

### Chosen implementation
[grpc-websocket-proxy](https://github.com/tmc/grpc-websocket-proxy/) mention in option 1 is relatively popular and is
open-source, so using it is no risk. The other option is much costlier.



## Questions

* Should we rename it to pipeline inspector instead? Pipeline is the term we use in the API, streams are used
internally.
* Cons: We cannot call it CSI (Conduit's Stream Inspector) anymore.
* Is metadata needed (such as time the data was captured)?
* Should there be a limit on how long a stream inspector can run?
* Should there be a limit on how many records a stream inspector can receive?
* Are we interested in more than the records? Is there some other data we'd like to see (now or in future)?

## Future work

* Inspector REPL
18 changes: 9 additions & 9 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ Metrics are exposed at `/metrics`. For example, if you're running Conduit locall
* **Conduit metrics**: We currently have a number of high level pipeline, processor and connector metrics, all of which are
defined in [measure.go](https://github.com/ConduitIO/conduit/blob/main/pkg/foundation/metrics/measure/measure.go). Those are:

|Pipeline name|Type|Description|
|---|---|---|
|`conduit_pipelines`|Gauge|Number of pipelines by status.|
|`conduit_connectors`|Gauge|Number of connectors by type (source, destination).|
|`conduit_processors`|Gauge|Number of processors by name and type.|
|`conduit_connector_bytes`|Histogram| Number of bytes a connector processed by pipeline name, plugin and type (source, destination).|
|`conduit_pipeline_execution_duration_seconds`|Histogram| Amount of time records spent in a pipeline.|
|`conduit_connector_execution_duration_seconds`|Histogram| Amount of time spent reading or writing records per pipeline, plugin and connector type (source, destination).|
|`conduit_processor_execution_duration_seconds`|Histogram| Amount of time spent on processing records per pipeline and processor.|
| Pipeline name | Type | Description |
|------------------------------------------------|-----------|----------------------------------------------------------------------------------------------------------------|
| `conduit_pipelines` | Gauge | Number of pipelines by status. |
| `conduit_connectors` | Gauge | Number of connectors by type (source, destination). |
| `conduit_processors` | Gauge | Number of processors by name and type. |
| `conduit_connector_bytes` | Histogram | Number of bytes a connector processed by pipeline name, plugin and type (source, destination). |
| `conduit_pipeline_execution_duration_seconds` | Histogram | Amount of time records spent in a pipeline. |
| `conduit_connector_execution_duration_seconds` | Histogram | Amount of time spent reading or writing records per pipeline, plugin and connector type (source, destination). |
| `conduit_processor_execution_duration_seconds` | Histogram | Amount of time spent on processing records per pipeline and processor. |

* **Go runtime metrics**: The default metrics exposed by Prometheus' official Go package, [client_golang](https://pkg.go.dev/github.com/prometheus/client_golang).
* **gRPC metrics**: The gRPC instrumentation package we use is [promgrpc](https://github.com/piotrkowalczuk/promgrpc).
Expand Down