Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: Stream inspector design #692

Merged
merged 27 commits into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 229 additions & 0 deletions docs/design-documents/20221024-stream-inspector.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
# Stream Inspector

## Goals

Make troubleshooting pipelines easier by making it possible to inspect the data which is flowing through pipeline.

## Requirements

1. All pipeline components (accessible "from the outside", i.e. sources, processors and destinations) should be
inspectable.
* For v1, it's enough that destinations are inspectable.
2. A source inspector should show the records coming out of the respective source.
3. A destination inspector should show the records coming into the respective destination.
4. A processor has two inspectors: one for the incoming records and one for the transformed records).
5. The inspector should provide the relevant records about the component being inspected, from the
time the inspection started. Historical data is not required.
6. The inspector should have a minimal footprint on the resource usage (CPU, memory etc.)
7. The inspector should have an insignificant impact on a pipeline's performance (ideally, no impact at all).
8. The inspector data should be available through:
* the HTTP API,
* the gRPC API and
* the UI.
9. Multiple users should be able to inspect the same pipeline component at the same time. Every user's session is
independent.
10. The inspector should stop publishing the data to a client, if the client appears to be idle/crashed.

**Important** :information_source:
In case a stream inspection is slower than the pipeline being inspected, it's possible that some records will be dropped
from the inspector. This is discussed more in the section "Blocking vs. non-blocking" below.

## Implementation
Here we discuss two aspects of the implementation: internals (i.e. how to actually get the records from the inspectable
pipeline components) and the API (i.e. how to deliver the inspected records to a client while providing a good user
experience).

### Blocking vs. non-blocking
It's possible that a stream inspector won't be able to catch up with a pipeline, e.g. in the case of high velocity sources.

To handle this, we have two options:
#### Option 1: Make the stream inspector blocking
In this option, the stream inspector would slow down the pipeline until it catches up. The goal is make sure all records
are inspected.

**Advantages**
1. Having complete data when troubleshooting.
2. This would make it possible to inject messages into a pipeline in the future.

**Disadvantages**
1. Pipeline performance is affected.
2. The implementation becomes more complex.

#### Option 2: Make the stream inspector non-blocking
In this option, the stream inspector would not slow the pipeline. Some records from the pipeline won't be shown in the
stream inspector at all due to this.

**Advantages**
1. Pipeline performance is not affected.
2. Simpler implementation.

**Disadvantages**
1. Not having complete data when troubleshooting.

#### Chosen option
The chosen option is a non-blocking stream inspector for following reasons:

A blocking stream inspector would fall apart if we inspect a stream that's processing 10s of thousands of messages a
second.

Also, the concept of "stream surgery" (inserting messages into a pipeline or modifying existing ones) may feel
intuitively valuable, in practice it might not be due to volume of messages.

The pattern we're working towards is one where we enable the user to easily discover the cause of a breakage (e.g.
bad data or bad transform) and allow them to easily write a processor that corrects the issue and re-introduces the
data back into the stream.

### Push based vs. pull based
Implementations will generally use one of two approaches: pull based and push based.

* In a **pull based** implementation, a client (HTTP client, gRPC client, UI) would be periodically checking for
inspection data.
* A client would need to store the "position" of the previous inspection, to get newer data.
* This would require inspection state management in Conduit for each session.
* Inspection data would come in with a delay.
* In a **push based** implementation, Conduit would be pushing the inspection data to a client (HTTP client, gRPC
client, UI).
* A client would not need to store the "position" of the previous inspection, to get newer data.
* Inspection state management in Conduit would be minimal.
* Inspection data would come in almost real-time.
* Inspecting the data may require additional tools (depends on the actual implementation)

From what we know so far, **the push based approach has more advantages and is easier to work with, so that's the
approach chosen here**. Concrete implementation options are discussed below.

For context: gRPC is the main API for Conduit. The HTTP API is generated using [grpc-gateway](https://github.com/grpc-ecosystem/grpc-gateway).

### API
Inspecting a pipeline component is triggered with a gRPC/HTTP API request. If the pipeline component cannot be inspected
for any reason (e.g. inspection not supported, component not found), an error is returned.

For any given pipeline component, only one gRPC/HTTP API method is required, one which starts an inspection (i.e.
sending of data to a client).

Three methods are required in total:
1. One to inspect a connector (there's a single endpoint for source and destination connectors)
2. One to inspect records coming into a processor
3. One to inspect records coming out of a processor

### Delivery options
Here we'll discuss options for delivering the stream of inspection data from the gRPC/HTTP API to a client.

#### Option 1: WebSockets
Conduit provides a streaming endpoint. The stream is exposed using the WebSockets API.

grpc-gateway doesn't support WebSockets (see [this](https://github.com/grpc-ecosystem/grpc-gateway/issues/168)). There's
an open-source proxy for it though, available [here](https://github.com/tmc/grpc-websocket-proxy/).

#### Option 2: Pure gRPC
Conduit provides a streaming endpoint. The stream is consumed as such, i.e. a gRPC endpoint. A JavaScript implementation
of gRPC for browser clients exists (called [grpc-web](https://github.com/grpc/grpc-web)). While that would mean no
changes in Conduit itself, it would require a lot of changes in the UI.

#### Option 3: Server-sent events
[Server-sent events](https://html.spec.whatwg.org/#server-sent-events) enable servers to push events to clients. Unlike
WebSockets, the communication is unidirectional.

While server-sent events generally match our requirements, the implementation would not be straightforward because
grpc-gateway doesn't support it, nor do they plan to support it (see [this](https://hackmd.io/@prysmaticlabs/eventstream-api)
and [this](https://github.com/grpc-ecosystem/grpc-gateway/issues/26)).

Also, we want the inspector to be available through the UI, and using WebSockets is a much easier option than server-sent
events.

#### Chosen delivery option
[grpc-websocket-proxy](https://github.com/tmc/grpc-websocket-proxy/) mention in option 1 is relatively popular and is
open-source, so using it is no risk. The other option is much costlier.

### Internals: Exposing the inspection data
Following options exist to expose the inspection data for node. How the data will be used by the API is discussed in
below sections.

#### Option 1: Connectors and processors expose a method
In this option, inspection would be performed at the connector or processor level.

Inspectable pipeline components themselves would expose an `Inspect` method, for example:
```go
// Example for a source, can also be a processor or a destination
func(s Source) Inspect(direction string) chan Record
```
(As a return value, we may use a special `struct` instead of `chan Record` to more easily propagate events, such as
inspection done.)

**Advantages**:
1. The implementation would be relatively straightforward and not complex.

**Disadvantages**:
1. Minor changes in the sources, processors and destinations are needed.

#### Option 2: Dedicated inspector nodes
In this option, we'd have a node which would be dedicated for inspecting data coming in and out of a source, processor
or destination node. To inspect a node we would dynamically add a node before or after a node being inspected.

**Advantages**:
1. All the code related to inspecting nodes would be "concentrated" in one or two node types.
2. Existing nodes don't need to be changed.
3. This makes it possible to inspect any node.
4. Solving this problem would put us into a good position to solve https://github.com/ConduitIO/conduit/issues/201.

**Disadvantages**
1. Currently, it's not possible to dynamically add a node, which would mean that we need to restart a pipeline to do this,
and that's not a good user experience.
2. On the other hand, changing the code so that a pipeline's topology can be dynamically changed is a relatively large
amount of work.

#### Option 3: Add the inspection code to `PubNode`s and `SubNode`s.

**Advantages**
1. Solves the problem for all nodes. While we're not necessarily interested in all the nodes, solving the problem at
the `PubNode` level solves the problem for sources and output records for processors at the same time, and solving
the problem at the `SubNode` level solves the problem for destinations and input records for processors at the same
time.

**Disadvantages**
1. Adding inspection to pub and sub nodes is complex. This complexity is reflected in following:
* Once we add a method to the `PubNode` and `SubNode` interfaces, we'll need to implement it in all current
implementations, even if that means only calling a method from an embedded type.
* `PubNode` and `SubNode` rely on Go channels to publish/subscribe to messages. Automatically sending messages from
those channels to registered inspectors is non-trivial.

#### Chosen implementation
Option 1, i.e. connectors and processor exposing methods to inspect themselves, is the preferred option given that
options 2 and 3 are relatively complex, and we would risk delivering this feature in scope of the 0.4 release.

## Questions

* Should we rename it to pipeline inspector instead? Pipeline is the term we use in the API, streams are used
internally.
* **Answer**: Since the inspector is about inspecting data, and the term stream stands for data whereas pipeline
stands for the setup/topology, keeping the term "stream inspector" makes sense.
* Is metadata needed (such as time the records were captured)?
* Examples:
* The time at which a record entered/left a node.
* ~~The source connector from which a record originated.~~ Can be found in OpenCDC metadata.
* Should there be a limit on how long a stream inspector can run?
* Should there be a limit on how many records a stream inspector can receive?
* Pros:
* Users could mistakenly leave inspector open "forever".
* This would handle the case where an inspector crashes.
* Cons:
* Some users may want to run an inspection over a longer period of time.
* The inspector will be non-blocking, not accumulating any data in memory, so it running over a longer period of time
won't consume significant resources.
* **Answer**: For now, we'll have no limits.
* Are we interested in more than the records? Is there some other data we'd like to see (now or in future)?
* **Answer**: We didn't find any data not related to records themselves which would be useful in the inspector.
* Should it be possible to specify which data is shown?
* **Answer**: No, out of scope. It's a matter of data representation on the client side.
* Should the inspector be blocking (if a client is consuming the inspected records slower that then pipeline rate,
then the pipeline would be throttled) or non-blocking?
* Arguments for blocking:
* the inspected data will be complete no matter what, making troubleshooting easier
* if we make it possible to insert records during inspection, it will make implementation easier
* Arguments against:
* "inspection" is meant to be "view-only" and not a debugger
* for high-volume sources this will always result in pipeline being throttled
* **Answer**: Based on above, the stream inspector should be non-blocking.

## Future work

* Inspector REPL: https://github.com/ConduitIO/conduit/issues/697
18 changes: 9 additions & 9 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ Metrics are exposed at `/metrics`. For example, if you're running Conduit locall
* **Conduit metrics**: We currently have a number of high level pipeline, processor and connector metrics, all of which are
defined in [measure.go](https://github.com/ConduitIO/conduit/blob/main/pkg/foundation/metrics/measure/measure.go). Those are:

|Pipeline name|Type|Description|
|---|---|---|
|`conduit_pipelines`|Gauge|Number of pipelines by status.|
|`conduit_connectors`|Gauge|Number of connectors by type (source, destination).|
|`conduit_processors`|Gauge|Number of processors by name and type.|
|`conduit_connector_bytes`|Histogram| Number of bytes a connector processed by pipeline name, plugin and type (source, destination).|
|`conduit_pipeline_execution_duration_seconds`|Histogram| Amount of time records spent in a pipeline.|
|`conduit_connector_execution_duration_seconds`|Histogram| Amount of time spent reading or writing records per pipeline, plugin and connector type (source, destination).|
|`conduit_processor_execution_duration_seconds`|Histogram| Amount of time spent on processing records per pipeline and processor.|
| Pipeline name | Type | Description |
|------------------------------------------------|-----------|----------------------------------------------------------------------------------------------------------------|
| `conduit_pipelines` | Gauge | Number of pipelines by status. |
| `conduit_connectors` | Gauge | Number of connectors by type (source, destination). |
| `conduit_processors` | Gauge | Number of processors by name and type. |
| `conduit_connector_bytes` | Histogram | Number of bytes a connector processed by pipeline name, plugin and type (source, destination). |
| `conduit_pipeline_execution_duration_seconds` | Histogram | Amount of time records spent in a pipeline. |
| `conduit_connector_execution_duration_seconds` | Histogram | Amount of time spent reading or writing records per pipeline, plugin and connector type (source, destination). |
| `conduit_processor_execution_duration_seconds` | Histogram | Amount of time spent on processing records per pipeline and processor. |

* **Go runtime metrics**: The default metrics exposed by Prometheus' official Go package, [client_golang](https://pkg.go.dev/github.com/prometheus/client_golang).
* **gRPC metrics**: The gRPC instrumentation package we use is [promgrpc](https://github.com/piotrkowalczuk/promgrpc).
Expand Down