From cd45646d40c9be7fff9ffc233fc49f051e3d4b5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Fri, 24 Feb 2023 18:17:26 +0100 Subject: [PATCH] Housekeeping: Markdownlint (#905) * markdownlint config * run markdownlint-cli2-fix * ignore line length in tables * fix linter issues in docs markdown files * markdown-cli2-fix * fix linter issues in ui readme * add markdown linter CI action * wrap filenames in code block * run markdownlint-cli2-fix (forgot some files) * fix remaining documents * add some files to ignore list in markdownlint ci action * fix formatting * fix indentations * fix markdownlint globs * remove rule that requires uppercase Go * make target markdown-lint * update pre-push git hook to lint markdown files --- .github/workflows/markdown-lint.yml | 21 +++ .markdownlint.yml | 45 +++++ CONTRIBUTING.md | 36 ++-- Makefile | 5 +- README.md | 43 +++-- .../20220121-conduit-plugin-architecture.md | 24 +-- docs/architecture.md | 85 ++++----- docs/code_guidelines.md | 54 +++--- docs/connector_discovery.md | 30 +-- docs/connectors.md | 31 +++- docs/design-documents/20220309-opencdc.md | 172 +++++++++++------- .../20221024-stream-inspector.md | 170 ++++++++++------- .../20221027-builtin-config-validation.md | 71 ++++---- docs/health_check.md | 6 +- docs/metrics.md | 24 +-- docs/pipeline_configuration_files.md | 47 +++-- docs/pipeline_semantics.md | 10 +- docs/processors.md | 63 ++++--- docs/releases.md | 45 +++-- githooks/pre-push | 1 + proto/README.md | 6 +- ui/README.md | 55 +++--- 22 files changed, 621 insertions(+), 423 deletions(-) create mode 100644 .github/workflows/markdown-lint.yml create mode 100644 .markdownlint.yml diff --git a/.github/workflows/markdown-lint.yml b/.github/workflows/markdown-lint.yml new file mode 100644 index 000000000..abc5bf546 --- /dev/null +++ b/.github/workflows/markdown-lint.yml @@ -0,0 +1,21 @@ +name: markdown-lint + +on: + pull_request: + paths: + - '**.md' + +jobs: + markdownlint-cli2: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: DavidAnson/markdownlint-cli2-action@v9 + with: + globs: | + **/*.md + !ui/node_modules + !LICENSE.md + !pkg/web/openapi/** + !.github/*.md + diff --git a/.markdownlint.yml b/.markdownlint.yml new file mode 100644 index 000000000..3c92443d4 --- /dev/null +++ b/.markdownlint.yml @@ -0,0 +1,45 @@ +code-block-style: + style: fenced + +code-fence-style: + style: backtick + +emphasis-style: + style: underscore + +strong-style: + style: asterisk + +fenced-code-language: + language_only: true + +heading-style: + style: atx + +hr-style: + style: "---" + +line-length: + line_length: 120 + code_blocks: false + tables: false + +no-duplicate-heading: + siblings_only: true + +ol-prefix: + style: ordered + +ul-style: + style: dash + +no-hard-tabs: + # allow hard tabs in Go code blocks + ignore_code_languages: [go] + spaces_per_tab: 2 + +proper-names: + code_blocks: false + names: + - JavaScript + - Conduit diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index be2728cd2..1d238b1ff 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -4,6 +4,7 @@ Thank you so much for contributing to Conduit. We appreciate your time and help! As a contributor, here are the guidelines we would like you to follow. ## Asking questions + If you have a question or you are not sure how to do something, please [open a discussion](https://github.com/ConduitIO/conduit/discussions) or hit us up on [Discord](https://discord.meroxa.com)! @@ -14,17 +15,19 @@ on [Discord](https://discord.meroxa.com)! [issues](https://github.com/ConduitIO/conduit/issues) to see if a similar one was already opened. If there is one already opened, feel free to comment on it. -1. Otherwise, please [open an issue](https://github.com/ConduitIO/conduit/issues/new) +2. Otherwise, please [open an issue](https://github.com/ConduitIO/conduit/issues/new) and let us know, and make sure to include the following: - * If it's a bug, please include: - * Steps to reproduce - * Copy of the logs. - * Your Conduit version. - * If it's a feature request, let us know the motivation behind that feature, + - If it's a bug, please include: + - Steps to reproduce + - Copy of the logs. + - Your Conduit version. + - If it's a feature request, let us know the motivation behind that feature, and the expected behavior of it. ## Submitting changes + We also value contributions in form of pull requests. When opening a PR please ensure: + - You have followed the [Code Guidelines](https://github.com/ConduitIO/conduit/blob/main/docs/code_guidelines.md). - There is no other [pull request](https://github.com/ConduitIO/conduit/pulls) for the same update/change. - You have written unit tests. @@ -32,11 +35,12 @@ We also value contributions in form of pull requests. When opening a PR please e Also, if you are submitting code, please ensure you have adequate tests for the feature, and that all the tests still run successfully. - * Unit tests can be run via `make test`. - * Integration tests can be run via `make test-integration`, they require - [Docker](https://www.docker.com/) to be installed and running. The tests will - spin up required docker containers, run the integration tests and stop the - containers afterwards. + +- Unit tests can be run via `make test`. +- Integration tests can be run via `make test-integration`, they require + [Docker](https://www.docker.com/) to be installed and running. The tests will + spin up required docker containers, run the integration tests and stop the + containers afterwards. We would like to ask you to use the provided Git hooks (by running `git config core.hooksPath githooks`), which automatically run the tests and the linter when pushing code. @@ -44,11 +48,11 @@ which automatically run the tests and the linter when pushing code. ### Quick steps to contribute 1. Fork the project -2. Download your fork to your machine +2. Download your fork to your machine 3. Create your feature branch (`git checkout -b my-new-feature`) -4. Make changes and run tests -5. Commit your changes -6. Push to the branch +4. Make changes and run tests +5. Commit your changes +6. Push to the branch 7. Create new pull request ## License @@ -60,4 +64,4 @@ Apache 2.0, see [LICENSE](LICENSE.md). Conduit has adopted [Contributor Covenant](https://www.contributor-covenant.org/) as its [Code of Conduct](https://github.com/ConduitIO/.github/blob/main/CODE_OF_CONDUCT.md). We highly encourage contributors to familiarize themselves with the standards we want our -community to follow and help us enforce them. \ No newline at end of file +community to follow and help us enforce them. diff --git a/Makefile b/Makefile index 3034421c9..94fa7f90f 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: test test-integration build run proto-update proto-lint clean download install-tools generate check-go-version +.PHONY: test test-integration build run proto-update proto-lint clean download install-tools generate check-go-version markdown-lint # Version will extract the current version of Conduit based on # the latest git tag and commit. If the repository contains any @@ -72,3 +72,6 @@ check-go-version: echo "${GO_VERSION_CHECK}";\ exit 1;\ fi + +markdown-lint: + markdownlint-cli2 "**/*.md" "#ui/node_modules" "#LICENSE.md" "#pkg/web/openapi/**" "#.github/*.md" diff --git a/README.md b/README.md index 4e380fd4f..6b3f7968f 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,6 @@ Conduit was created and open-sourced by [Meroxa](https://meroxa.io). - [API](#api) - [UI](#ui) - [Documentation](#documentation) -- [Known limitations](#known-limitations) - [Contributing](#contributing) ## Quick start @@ -50,23 +49,29 @@ Conduit was created and open-sourced by [Meroxa](https://meroxa.io). the [example pipeline](/examples/pipelines/file-to-file.yml) and put it in the directory named `pipelines` in the same directory as the Conduit binary. -3. Run conduit (`./conduit`). The example pipeline will start automatically. +3. Run Conduit (`./conduit`). The example pipeline will start automatically. 4. Write something to file `example.in` in the same directory as the Conduit binary. + + ```sh + echo "hello conduit" >> example.in` ``` - $ echo "hello conduit" >> example.in` - ``` + 5. Read the contents of `example.out` and notice an OpenCDC record: - ``` + + ```sh $ cat example.out {"position":"MTQ=","operation":"create","metadata":{"file.path":"./example.in","opencdc.readAt":"1663858188836816000","opencdc.version":"v1"},"key":"MQ==","payload":{"before":null,"after":"aGVsbG8gY29uZHVpdA=="}} ``` + 6. The string `hello conduit` is a base64 encoded string stored in the field `payload.after`, let's decode it: - ``` + + ```sh $ cat example.out | jq ".payload.after | @base64d" "hello conduit" ``` + 7. Explore the UI by opening `http://localhost:8080` and build your own pipeline! @@ -78,7 +83,7 @@ Download a pre-built binary from the [latest release](https://github.com/conduitio/conduit/releases/latest) and simply run it! -``` +```sh ./conduit ``` @@ -95,11 +100,11 @@ of available options, run `./conduit --help`. Requirements: -* [Go](https://golang.org/) (1.20 or later) -* [Node.js](https://nodejs.org/) (16.x) -* [Yarn](https://yarnpkg.com/) (latest 1.x) -* [Ember CLI](https://ember-cli.com/) -* [Make](https://www.gnu.org/software/make/) +- [Go](https://golang.org/) (1.20 or later) +- [Node.js](https://nodejs.org/) (16.x) +- [Yarn](https://yarnpkg.com/) (latest 1.x) +- [Ember CLI](https://ember-cli.com/) +- [Make](https://www.gnu.org/software/make/) ```shell git clone git@github.com:ConduitIO/conduit.git @@ -118,7 +123,7 @@ running Conduit as a simple backend service. Our Docker images are hosted on GitHub's Container Registry. To run the latest Conduit version, you should run the following command: -``` +```sh docker run -p 8080:8080 ghcr.io/conduitio/conduit:latest ``` @@ -211,20 +216,20 @@ For more information about the UI refer to the [Readme](ui/README.md) in `/ui`. ## Documentation To learn more about how to use Conduit -visit [docs.conduit.io](https://docs.conduit.io). +visit [docs.Conduit.io](https://docs.conduit.io). If you are interested in internals of Conduit we have prepared some technical documentation: -* [Pipeline Semantics](docs/pipeline_semantics.md) explains the internals of how +- [Pipeline Semantics](docs/pipeline_semantics.md) explains the internals of how a Conduit pipeline works. -* [Pipeline Configuration Files](docs/pipeline_configuration_files.md) +- [Pipeline Configuration Files](docs/pipeline_configuration_files.md) explains how you can define pipelines using YAML files. -* [Processors](docs/processors.md) contains examples and more information about +- [Processors](docs/processors.md) contains examples and more information about Conduit processors. -* [Conduit Architecture](docs/architecture.md) +- [Conduit Architecture](docs/architecture.md) will give you a high-level overview of Conduit. -* [Conduit Metrics](docs/metrics.md) +- [Conduit Metrics](docs/metrics.md) provides more information about how Conduit exposes metrics. ## Contributing diff --git a/docs/architecture-decision-records/20220121-conduit-plugin-architecture.md b/docs/architecture-decision-records/20220121-conduit-plugin-architecture.md index d20ba9416..92879cbda 100644 --- a/docs/architecture-decision-records/20220121-conduit-plugin-architecture.md +++ b/docs/architecture-decision-records/20220121-conduit-plugin-architecture.md @@ -36,7 +36,7 @@ The decision can be broken up into 4 parts, these are explained in detail later ### Conduit plugin interface (gRPC) The Conduit plugin interface is defined in gRPC, is standalone (does not depend on Conduit or the SDK) and lives -in https://github.com/conduitio/connector-plugin. This repository is the only common thing between Conduit and a plugin, +in . This repository is the only common thing between Conduit and a plugin, meaning that Conduit uses the interface to interact with plugins and plugins implement the interface. The proto files define the messages and gRPC interface that needs to be implemented by the connector plugin. The @@ -68,10 +68,10 @@ Functions will be called in the order in which they are defined. - `Configure` - the plugin needs to validate the configuration it receives and either store the configuration and return no error or discard it and return an error explaining why the configuration is invalid. This function serves two purposes: - - Config validation - Conduit calls `Configure` when the connector is first created or when the configuration is - updated to validate the configuration. In this case the next call is `Teardown` and the plugin is stopped. - - Configuring the plugin - Conduit calls `Configure` when the pipeline is started to provide the plugin with its - config. The next call after a successful response is `Start`. + - Config validation - Conduit calls `Configure` when the connector is first created or when the configuration is + updated to validate the configuration. In this case the next call is `Teardown` and the plugin is stopped. + - Configuring the plugin - Conduit calls `Configure` when the pipeline is started to provide the plugin with its + config. The next call after a successful response is `Start`. - `Start` - with a call to this function Conduit signals to the plugin that it wants it to start running. The request will contain the position at which the plugin should start running (the position might be empty in case the pipeline is started for the first time). The plugin is expected to open any connections needed to fetch records. In case of a @@ -113,7 +113,7 @@ Functions will be called in the order in which they are defined. independent and are able to transmit data concurrently. The plugin is expected to send an acknowledgment back to Conduit for every record it received, even if the record was not processed successfully (in that case the acknowledgment should contain the error). The stream should stay open until either an error occurs or the `Stop` - function is called *and* all remaining acknowledgments are sent (this is handled by the SDK). + function is called _and_ all remaining acknowledgments are sent (this is handled by the SDK). - `Stop` - Conduit signals to the plugin that there be no more records written to the request stream in `Run`. The plugin needs to flush any records that are cached in memory, send back the acknowledgments and stop the `Run` function. @@ -166,11 +166,11 @@ things to point out - The bidirectional stream method `Run` is broken down into two separate methods, one for receiving messages from the stream and one to send messages. Those methods can only be called after a call to `Start` since that is the method in which the stream actually gets opened. - - In `SourcePlugin` we can read records from the stream by calling `Read`. This method will block until either an - error occurs or a new record is produced by the plugin. Successfully processed records can be acknowledged by - calling `Ack` with the corresponding record position. - - In `DestinationPlugin` we can write records to the stream by calling `Write`. To receive acknowledgments we can - call `Ack` which will block until either an error occurs or an acknowledgment is produced by the plugin. + - In `SourcePlugin` we can read records from the stream by calling `Read`. This method will block until either an + error occurs or a new record is produced by the plugin. Successfully processed records can be acknowledged by + calling `Ack` with the corresponding record position. + - In `DestinationPlugin` we can write records to the stream by calling `Write`. To receive acknowledgments we can + call `Ack` which will block until either an error occurs or an acknowledgment is produced by the plugin. ### Plugin registries @@ -207,7 +207,7 @@ The plugin registry is the last missing piece that contains information about th ```protobuf type registry interface { New(logger log.CtxLogger, name string) (Dispenser, error) - } +} ``` Behind this interface, there are again two registries that contain either built-in or standalone dispensers. To diff --git a/docs/architecture.md b/docs/architecture.md index 1d3e452d4..7a4e3a2de 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -4,63 +4,64 @@ This document describes the Conduit architecture. ## Vocabulary -* **Pipeline** - a pipeline receives records from one or multiple source connectors, pushes them through zero or +- **Pipeline** - a pipeline receives records from one or multiple source connectors, pushes them through zero or multiple processors until they reach one or multiple destination connectors. -* **Connector** - a connector is the internal entity that communicates with a connector plugin and either pushes records +- **Connector** - a connector is the internal entity that communicates with a connector plugin and either pushes records from the plugin into the pipeline (source connector) or the other way around (destination connector). -* **Connector plugin** - sometimes also referred to as "plugin", is an external process which communicates with Conduit +- **Connector plugin** - sometimes also referred to as "plugin", is an external process which communicates with Conduit and knows how to read/write records from/to a data source/destination (e.g. a database). -* **Processor** - a component that executes an operation on a single record that flows through the pipeline. It can +- **Processor** - a component that executes an operation on a single record that flows through the pipeline. It can either change the record or filter it out based on some criteria. -* **Record** - a record represents a single piece of data that flows through a pipeline (e.g. one database row). +- **Record** - a record represents a single piece of data that flows through a pipeline (e.g. one database row). ## High level overview ![Component diagram](data/component_diagram_full.svg) Conduit is split in the following layers: -* **API layer** - exposes the public APIs used to communicate with Conduit. It exposes 2 types of APIs: - * **gRPC** - this is the main API provided by Conduit. The gRPC API definition can be found in + +- **API layer** - exposes the public APIs used to communicate with Conduit. It exposes 2 types of APIs: + - **gRPC** - this is the main API provided by Conduit. The gRPC API definition can be found in [api.proto](../proto/api/v1/api.proto), it can be used to generate code for the client. - * **HTTP** - the HTTP API is generated using [grpc-gateway](https://github.com/grpc-ecosystem/grpc-gateway) and - forwards the requests to the gRPC API. Conduit exposes an + - **HTTP** - the HTTP API is generated using [grpc-gateway](https://github.com/grpc-ecosystem/grpc-gateway) and + forwards the requests to the gRPC API. Conduit exposes an [openapi](../pkg/web/openapi/swagger-ui/api/v1/api.swagger.json) definition that describes the HTTP API, which is also exposed through Swagger UI on `http://localhost:8080/openapi/`. -* **Orchestration layer** - the orchestration layer is responsible for coordinating the flow of operations between the +- **Orchestration layer** - the orchestration layer is responsible for coordinating the flow of operations between the core services. It also takes care of transactions, making sure that changes made to specific entities are not visible to the outside until the whole operation succeeded. There are 3 orchestrators, each responsible for actions related to one of the 3 main entities - pipelines, connectors and processors. -* **Core** - we regard the core to be the combination of the entity management layer and the pipeline engine. It +- **Core** - we regard the core to be the combination of the entity management layer and the pipeline engine. It provides functionality to the orchestrator layer and does not concern itself with where requests come from and how single operations are combined into more complex flows. - * **Entity management** - this layer is concerned with the creation, editing, deletion and storage of the main + - **Entity management** - this layer is concerned with the creation, editing, deletion and storage of the main entities. You can think about this as a simple CRUD layer. It can be split up further using the main entities: - * **Pipeline** - this is the central entity managed by the Pipeline Service that ties together all other components. + - **Pipeline** - this is the central entity managed by the Pipeline Service that ties together all other components. A pipeline contains the configuration that defines how pipeline nodes should be connected together in a running pipeline. It has references to at least one source and one destination connector and zero or multiple processors, a pipeline that does not meet the criteria is regarded as incomplete and can't be started. A pipeline can be either running, stopped or degraded (stopped because of an error). The pipeline can only be edited if it's not in a running state. - * **Connector** - a connector takes care of receiving or forwarding records to connector plugins, depending on its + - **Connector** - a connector takes care of receiving or forwarding records to connector plugins, depending on its type (source or destination). It is also responsible for tracking the connector state as records flow through it. The Connector Service manages the creation of connectors and permanently stores them in the Connector Store. A connector can be configured to reference a number of processors, which will be executed only on records that are received from or forwarded to that specific connector. - * **Processor** - processors are stateless components that operate on a single record and can execute arbitrary + - **Processor** - processors are stateless components that operate on a single record and can execute arbitrary actions before forwarding the record to the next node in the pipeline. A processor can also choose to drop a record without forwarding it. They can be attached either to a connector or to a pipeline, based on that they are either processing only records that flow from/to a connector or all records that flow through a pipeline. - * **Pipeline Engine** - the pipeline engine consists of nodes that can be connected together with Go channels to form + - **Pipeline Engine** - the pipeline engine consists of nodes that can be connected together with Go channels to form a data pipeline. - * **Node** - a node is a lightweight component that runs in its own goroutine and runs as long as the incoming channel + - **Node** - a node is a lightweight component that runs in its own goroutine and runs as long as the incoming channel is open. As soon as the previous node stops forwarding records and closes its out channel, the current node also stops running and closes its out channel. This continues down the pipeline until all records are drained and the pipeline gracefully stops. In case a node experiences an error all other nodes will be notified and stop running as soon as possible without draining the pipeline. -* **Persistence** - this layer is used directly by the orchestration layer and indirectly by the core layer (through +- **Persistence** - this layer is used directly by the orchestration layer and indirectly by the core layer (through stores) to persist data. It provides the functionality of creating transactions and storing, retrieving and deleting arbitrary data like configurations or state. -* **Plugins** - while this is not a layer in the same sense as the other layers, it is a component separate from +- **Plugins** - while this is not a layer in the same sense as the other layers, it is a component separate from everything else. It interfaces with the connector on one side and with Conduit plugins on the other and facilitates the communication between them. A Conduit plugin is a separate process that implements the interface defined in [plugins.proto](https://github.com/ConduitIO/conduit/blob/main/pkg/plugins/proto/plugins.proto) and provides the @@ -68,39 +69,39 @@ Conduit is split in the following layers: ## Package structure -* `cmd` - Contains main applications. The directory name for each application should match the name of the executable +- `cmd` - Contains main applications. The directory name for each application should match the name of the executable (e.g. `cmd/conduit` produces an executable called `conduit`). It is the responsibility of main applications to do 3 things, it should not include anything else: 1. Read the configuration (from a file, the environment or arguments). 2. Instantiate, wire up and run internal services. 3. Listen for signals (i.e. SIGTERM, SIGINT) and forward them to internal services to ensure a graceful shutdown (e.g. via a closed context). - * `conduit` - The entrypoint for the main conduit executable. -* `pkg` - The internal libraries and services that Conduit runs. - * `conduit` - Defines the main runtime that ties all Conduit layers together. - * `connector` - Code regarding connectors, including connector store, connector service, connector configurations + - `conduit` - The entrypoint for the main Conduit executable. +- `pkg` - The internal libraries and services that Conduit runs. + - `conduit` - Defines the main runtime that ties all Conduit layers together. + - `connector` - Code regarding connectors, including connector store, connector service, connector configurations and running instances. - * `foundation` - Foundation contains reusable code. Should not contain any business logic. A few honorable mentions: - * `assert` - Exposes common assertions for testing. - * `cerrors` - Exposes error creation and wrapping functionality. This is the only package for errors used in Conduit. - * `database` - Exposes functionality for storing values. - * `log` - Exposes a logger. This is the logger used throughout Conduit. - * `metrics` - Exposes functionality for gathering and exposing metrics. - * `orchestrator` - Code regarding the orchestration layer. - * `pipeline` - Code regarding pipelines, including pipeline store, pipeline service, running pipeline instances. - * `plugin` - Currently contains all logic related to plugins as well as the plugins themselves. In the future a lot of + - `foundation` - Foundation contains reusable code. Should not contain any business logic. A few honorable mentions: + - `assert` - Exposes common assertions for testing. + - `cerrors` - Exposes error creation and wrapping functionality. This is the only package for errors used in Conduit. + - `database` - Exposes functionality for storing values. + - `log` - Exposes a logger. This is the logger used throughout Conduit. + - `metrics` - Exposes functionality for gathering and exposing metrics. + - `orchestrator` - Code regarding the orchestration layer. + - `pipeline` - Code regarding pipelines, including pipeline store, pipeline service, running pipeline instances. + - `plugin` - Currently contains all logic related to plugins as well as the plugins themselves. In the future a lot of this code will be extracted into separate repositories, what will be left is a plugin service that manages built-in and external plugins. - * `processor` - Provides the types for processing a `Record`. A common abbreviation for transforms is `txf`. - * `transform/txfbuiltin` - Contains built-in transforms. - * `transform/txfjs` - Provides the functionality for implementing a transform in JavaScript. - * `record` - Everything regarding a `Record`, that is the central entity that is pushed through a Conduit pipeline. + - `processor` - Provides the types for processing a `Record`. A common abbreviation for transforms is `txf`. + - `transform/txfbuiltin` - Contains built-in transforms. + - `transform/txfjs` - Provides the functionality for implementing a transform in JavaScript. + - `record` - Everything regarding a `Record`, that is the central entity that is pushed through a Conduit pipeline. This includes a record `Schema`. - * `web` - Everything related to Conduit APIs or hosted pages like the UI or Swagger. + - `web` - Everything related to Conduit APIs or hosted pages like the UI or Swagger. Other folders that don't contain Go code: -* `docs` - Documentation regarding Conduit. -* `proto` - Protobuf files (e.g. gRPC API definition). -* `test` - Contains configurations needed for integration tests. -* `ui` - A subproject containing the web UI for Conduit. +- `docs` - Documentation regarding Conduit. +- `proto` - Protobuf files (e.g. gRPC API definition). +- `test` - Contains configurations needed for integration tests. +- `ui` - A subproject containing the web UI for Conduit. diff --git a/docs/code_guidelines.md b/docs/code_guidelines.md index e79a65563..ced651cff 100644 --- a/docs/code_guidelines.md +++ b/docs/code_guidelines.md @@ -7,38 +7,38 @@ Unless specified otherwise, we should follow the guidelines outlined in Conduit is using [golangci-lint](https://golangci-lint.run/) to ensure the code conforms to our code guidelines. Part of the guidelines are outlined below. -### General +## General General pointers around writing code for Conduit: -* Functions should generally **return a specific type instead of an interface**. The caller should have the ability to +- Functions should generally **return a specific type instead of an interface**. The caller should have the ability to know the exact type of the returned object and not only the interface it fulfills. -* **Interfaces should be defined locally** in the package that is using the interface and not in the package that +- **Interfaces should be defined locally** in the package that is using the interface and not in the package that defines structs which implement it. Interfaces should also be defined as minimally as possible. This way mocks can be generated and used independently in each package. -* Try to cleanly **separate concerns** and do not let implementation details spill to the caller code. -* When naming types, always keep in mind that the type will be used with the package name. We should **write code that +- Try to cleanly **separate concerns** and do not let implementation details spill to the caller code. +- When naming types, always keep in mind that the type will be used with the package name. We should **write code that does not stutter** (e.g. use `connector.Create` instead of `connector.CreateConnector`). -* **Pointer vs value semantics** - when defining a type, we should decide what semantic will be used for that type and +- **Pointer vs value semantics** - when defining a type, we should decide what semantic will be used for that type and stick with it throughout the codebase. -* **Avoid global state**, rather pass things explicitly between structs and functions. +- **Avoid global state**, rather pass things explicitly between structs and functions. -### Packages +## Packages We generally follow the [Style guideline for Go packages](https://rakyll.org/style-packages/). Here is a short summary of those guidelines: -* Organize code into packages by their functional responsibility. -* Package names should be lowercase only (don't use snake_case or camelCase). -* Package names should be short, but should be unique and representative. Avoid overly broad package names like `common` +- Organize code into packages by their functional responsibility. +- Package names should be lowercase only (don't use snake_case or camelCase). +- Package names should be short, but should be unique and representative. Avoid overly broad package names like `common` and `util`. -* Use singular package names (e.g. `transform` instead of `transforms`). -* Use `doc.go` to document a package. +- Use singular package names (e.g. `transform` instead of `transforms`). +- Use `doc.go` to document a package. Additionally, we encourage the usage of the `internal` package to hide complex internal implementation details of a package and enforce a better separation of concerns between packages. -### Logging +## Logging We want to keep our logs as minimal as possible and reserve them for actionable messages like warnings and errors when something does not go as expected. Info logs are fine when booting up the app, all other successful operations should be @@ -48,37 +48,37 @@ that. Logs should contain contextual information (e.g. what triggered the action that printed a log). Our internal logger takes care of enriching the log message using the supplied `context.Context`. There are 3 use cases: -* If the operation was triggered by a request, the log will contain the request ID. -* If the operation was triggered by a new record flowing through the pipeline, the log will contain the record position. -* If the operation was triggered by a background job, the log will contain the name / identifier of that job. +- If the operation was triggered by a request, the log will contain the request ID. +- If the operation was triggered by a new record flowing through the pipeline, the log will contain the record position. +- If the operation was triggered by a background job, the log will contain the name / identifier of that job. Connector plugins are free to use any logger, as long as the output is routed to stdout. Conduit will capture those logs and display them alongside internal logs. -### Error Handling +## Error Handling -* All errors need to be wrapped before they cross package boundaries. Wrapping is done using the `cerrors` Conduit library, -i.e. `cerrors.Errorf("could not do X: %w", err)`. We are using the same library to unwrap and compare errors, +- All errors need to be wrapped before they cross package boundaries. Wrapping is done using the `cerrors` Conduit library, +i.e. `cerrors.Errorf("could not do X: %w", err)`. We are using the same library to unwrap and compare errors, i.e. `cerrors.Is(err, MyErrorType)`. -* Any error needs to be handled by either logging the error and recovering from it, or +- Any error needs to be handled by either logging the error and recovering from it, or wrapping and returning it to the caller. We should never both log and return the error. -* It's preferred to have a single file called `errors.go` per package which contains all the +- It's preferred to have a single file called `errors.go` per package which contains all the error variables from that package. -### Testing +## Testing We have 3 test suites: -* Unit tests are normal Go tests that don't need any external services to run and mock internal dependencies. They are +- Unit tests are normal Go tests that don't need any external services to run and mock internal dependencies. They are located in files named `${FILE}_test.go`, where `${FILE}.go` contains the code that's being tested. -* Integration tests are also written in Go, but can expect external dependencies (e.g. a running database instance). +- Integration tests are also written in Go, but can expect external dependencies (e.g. a running database instance). These tests should mostly be contained to code that directly communicates with those external dependencies. Integration tests are located in files named `${FILE}_integration_test.go`, where `${FILE}.go` contains the code that's being tested. Files that contain integration tests must contain the build tag `//go:build integration`. -* End-to-end tests are tests that spin up an instance of Conduit and test its operation as a black-box through the +- End-to-end tests are tests that spin up an instance of Conduit and test its operation as a black-box through the exposed APIs. These tests are located in the `e2e` folder. -### Documentation +## Documentation We should write in-line documentation that can be read by `godoc`. This means that exported types, functions and variables need to have a preceding comment that starts with the name of the expression and end with a dot. diff --git a/docs/connector_discovery.md b/docs/connector_discovery.md index 952b624f6..fc98fd232 100644 --- a/docs/connector_discovery.md +++ b/docs/connector_discovery.md @@ -1,9 +1,11 @@ # Connector discovery ## Connectors' location + Conduit loads standalone connectors at startup. The connector binaries need to be placed in the `connectors` directory relative to the Conduit binary so Conduit can find them. Alternatively, the path to the standalone connectors can be adjusted using the CLI flag `-connectors.path`, for example: + ```shell ./conduit -connectors.path=/path/to/connectors/ ``` @@ -13,30 +15,30 @@ connectors themselves (using their gRPC API). ## Referencing connectors -The name used to reference a connector in API requests (e.g. to createa new connector) comes in the following format: +The name used to reference a connector in API requests (e.g. to createa new connector) comes in the following format: `[PLUGIN-TYPE:]PLUGIN-NAME[@VERSION]` - `PLUGIN-TYPE` (`builtin`, `standalone` or `any`) - - Defines if the specified plugin should be builtin or standalone. - - If `any`, Conduit will use a standalone plugin if it exists and fall back to a builtin plugin. - - Default is `any`. + - Defines if the specified plugin should be builtin or standalone. + - If `any`, Conduit will use a standalone plugin if it exists and fall back to a builtin plugin. + - Default is `any`. - `PLUGIN-NAME` - - Defines the name of the plugin as specified in the plugin specifications, it has to be an exact match. + - Defines the name of the plugin as specified in the plugin specifications, it has to be an exact match. - `VERSION` - - Defines the plugin version as specified in the plugin specifications, it has to be an exact match. - - If `latest`, Conduit will use the latest semantic version. - - Default is `latest`. + - Defines the plugin version as specified in the plugin specifications, it has to be an exact match. + - If `latest`, Conduit will use the latest semantic version. + - Default is `latest`. Examples: - `postgres` - - will use the **latest** **standalone** **postgres** plugin - - will fallback to the **latest** **builtin** **postgres** plugin if standalone wasn't found + - will use the **latest** **standalone** **postgres** plugin + - will fallback to the **latest** **builtin** **postgres** plugin if standalone wasn't found - `postgres@v0.2.0` - - will use the **standalone** **postgres** plugin with version **v0.2.0** - - will fallback to a **builtin** **postgres** plugin with version **v0.2.0** if standalone wasn't found + - will use the **standalone** **postgres** plugin with version **v0.2.0** + - will fallback to a **builtin** **postgres** plugin with version **v0.2.0** if standalone wasn't found - `builtin:postgres` - - will use the **latest** **builtin** **postgres** plugin + - will use the **latest** **builtin** **postgres** plugin - `standalone:postgres@v0.3.0` - - will use the **standalone** **postgres** plugin with version **v0.3.0** (no fallback to builtin) + - will use the **standalone** **postgres** plugin with version **v0.3.0** (no fallback to builtin) diff --git a/docs/connectors.md b/docs/connectors.md index 7efd8d75c..6bdadd824 100644 --- a/docs/connectors.md +++ b/docs/connectors.md @@ -1,27 +1,40 @@ # Conduit Connectors -Connectors are an integral part of Conduit. Conduit ships with a couple of connectors that are built into the service to help developers bootstrap pipelines much more quickly. The built-in connectors include Postgres, File, Random Data Generator, Kafka and Amazon S3 connectors. - -The difference between Conduit connectors and those you might find from other services is that Conduit connectors are Change Data Capture-first (CDC). CDC allows your pipeline to only get the changes that have happened over time instead of pulling down an entire upstream data store and then tracking diffs between some period of time. This is critical for building real-time event-driven pipelines and applications. But, we'll note where connectors don't have CDC capabilities. +Connectors are an integral part of Conduit. Conduit ships with a couple of connectors that are built into the service +to help developers bootstrap pipelines much more quickly. The built-in connectors include Postgres, File, Random Data +Generator, Kafka and Amazon S3 connectors. +The difference between Conduit connectors and those you might find from other services is that Conduit connectors are +Change Data Capture-first (CDC). CDC allows your pipeline to only get the changes that have happened over time instead +of pulling down an entire upstream data store and then tracking diffs between some period of time. This is critical for +building real-time event-driven pipelines and applications. But, we'll note where connectors don't have CDC capabilities. ## Roadmap & Feedback -If you need support for a particular data store that doesn't exist on the connector list, check out the list of requested [source connectors](https://github.com/ConduitIO/conduit/issues?q=is%3Aissue+label%3Aconnector%3Asource+) and the list of requested [destination connectors](https://github.com/ConduitIO/conduit/issues?q=is%3Aissue+label%3Aconnector%3Adestination). Give the issue a `+1` if you really need that connector. The upvote will help the team understand demand for any particular connector. If you find that an issue hasn't been created for your data store, please create a new issue in the Conduit repo. +If you need support for a particular data store that doesn't exist on the connector list, check out the list of +requested [source connectors](https://github.com/ConduitIO/conduit/issues?q=is%3Aissue+label%3Aconnector%3Asource+) and +the list of requested [destination connectors](https://github.com/ConduitIO/conduit/issues?q=is%3Aissue+label%3Aconnector%3Adestination). +Give the issue a `+1` if you really need that connector. The upvote will help the team understand demand for any +particular connector. If you find that an issue hasn't been created for your data store, please create a new issue in +the Conduit repo. ## Connectors ### Support Types -* `Conduit` - These are connectors that are built by the Conduit. Any issues or problems filed on those repos will be respond to by the Conduit team. -* `Community` - A community connector is one where a developer created a connector and they're the ones supporting it not the Conduit team. -* `Legacy` - Some connectors are built using non-preferred methods. For example, Kafka Connect connectors can be used on Conduit. This is considered a stop gap measure until `conduit` or `community` connectors are built. +- `Conduit` - These are connectors that are built by the Conduit. Any issues or problems filed on those repos will be + respond to by the Conduit team. +- `Community` - A community connector is one where a developer created a connector and they're the ones supporting it + not the Conduit team. +- `Legacy` - Some connectors are built using non-preferred methods. For example, Kafka Connect connectors can be used + on Conduit. This is considered a stop gap measure until `conduit` or `community` connectors are built. At this time, Conduit does not have any commercially supported connectors. ### Source & Destination -Source means the connector has the ability to get data from an upstream data store. Destination means the connector can to write to a downstream data store. +Source means the connector has the ability to get data from an upstream data store. Destination means the connector can +to write to a downstream data store. ### The List @@ -53,4 +66,4 @@ Source means the connector has the ability to get data from an upstream data sto | [Snowflake](https://github.com/conduitio-labs/conduit-connector-snowflake) |✅ | | Community |v0.3.0| | [Stripe](https://github.com/conduitio-labs/conduit-connector-stripe) |✅ | | Community |v0.3.0| | [Vitess](https://github.com/conduitio-labs/conduit-connector-vitess) |WIP|WIP| Community |WIP| -| [Zendesk](https://github.com/conduitio-labs/conduit-connector-zendesk) |✅ |✅| Community |v0.3.0| \ No newline at end of file +| [Zendesk](https://github.com/conduitio-labs/conduit-connector-zendesk) |✅ |✅| Community |v0.3.0| diff --git a/docs/design-documents/20220309-opencdc.md b/docs/design-documents/20220309-opencdc.md index ded65e870..bb1174205 100644 --- a/docs/design-documents/20220309-opencdc.md +++ b/docs/design-documents/20220309-opencdc.md @@ -7,6 +7,7 @@ The main goal is to ensure compatibility between any combination of source/destination connectors. Define a format that fulfills the following criteria: + - A record can be associated to a schema that describes the payload or key - The payload/key schema can be included in the record or pointing to a schema stored in a schema registry - Be able to represent an created, updated and deleted record @@ -15,6 +16,7 @@ Define a format that fulfills the following criteria: - Define standard metadata fields Besides the format itself, we want to roughly define how Conduit will work with the record, specifically: + - How transforms change the associated schema? - How can a schema be attached to a raw record? - How can a schema be extracted to a schema registry? @@ -23,22 +25,33 @@ Besides the format itself, we want to roughly define how Conduit will work with - **Should we have a separate field announcing the record was created as part of a snapshot?** - No, this information will be stored in the action (create, update, delete, snapshot). -- **Should the record support sending a schema only (e.g. we transmit a CREATE TABLE statement)? This might be useful for setting up the target structure even if the source is still empty.** - - No, this kind of data would only make sense in straight replication between two systems without any transforms. Connectors should rather detect differences in schemas lazily based on each record (more info: https://github.com/ConduitIO/conduit/pull/326#discussion_r835596003). -- **Should the specification of a plugin include the info about what data a plugin can work with? More specifically, the plugin could announce if it can handle raw data or not (e.g. postgres right now needs structured data). If we had this info we could let the user know in advance that a pipeline config is invalid or that a schema needs to be added to records so that they can be converted to structured data.** +- **Should the record support sending a schema only (e.g. we transmit a CREATE TABLE statement)? This might be useful + for setting up the target structure even if the source is still empty.** + - No, this kind of data would only make sense in straight replication between two systems without any transforms. + Connectors should rather detect differences in schemas lazily based on each record + (more info: ). +- **Should the specification of a plugin include the info about what data a plugin can work with? More specifically, the + plugin could announce if it can handle raw data or not (e.g. postgres right now needs structured data). If we had this + info we could let the user know in advance that a pipeline config is invalid or that a schema needs to be added to + records so that they can be converted to structured data.** - No, this is out of scope for now. -- **Should the record include a diff in case of an update (old values vs. new values)? We need at least the old key values in case the key value was updated to correctly identify the record.** +- **Should the record include a diff in case of an update (old values vs. new values)? We need at least the old key + values in case the key value was updated to correctly identify the record.** - Yes, before / after values should be supported. ### Background -Here are a couple of scenarios that we want to support. Records in these examples are formatted in the format defined in implementation option 1. +Here are a couple of scenarios that we want to support. Records in these examples are formatted in the format defined in +implementation option 1. #### Scenario 1: Connector ships raw data without schema -In this scenario we have a pipeline that reads from a schemaless source (for example Kafka). The key and payload that is read is raw, the connector isn't aware of what the structure is so it creates an OpenCDC record with the raw key, raw payload and some metadata and sends it to Conduit for further processing. +In this scenario we have a pipeline that reads from a schemaless source (for example Kafka). The key and payload that is +read is raw, the connector isn't aware of what the structure is so it creates an OpenCDC record with the raw key, raw +payload and some metadata and sends it to Conduit for further processing. -The record that Conduit receives could look something like this (note that the record is an internal Go struct, the JSON representation is strictly for illustration purposes): +The record that Conduit receives could look something like this (note that the record is an internal Go struct, the JSON +representation is strictly for illustration purposes): ```json { @@ -68,7 +81,10 @@ The record that Conduit receives could look something like this (note that the r #### Scenario 2: Connector ships structured data without schema -A connector could read from a source that supplies structured data, but has no idea about the schema of that structured data (for example a connector exposing a webhook accepting any JSON request). The connector supplies an OpenCDC record with a structured payload and/or key and sends it to Conduit for further processing. Note that the connector could infer the schema from the structured payload, but it does not need to do that since Conduit could do the same thing. +A connector could read from a source that supplies structured data, but has no idea about the schema of that structured +data (for example a connector exposing a webhook accepting any JSON request). The connector supplies an OpenCDC record +with a structured payload and/or key and sends it to Conduit for further processing. Note that the connector could infer +the schema from the structured payload, but it does not need to do that since Conduit could do the same thing. Example record that Conduit receives in this case: @@ -100,8 +116,9 @@ Example record that Conduit receives in this case: #### Scenario 3: Connector ships raw data with schema -This scenario assumes the connector provides raw data but also provides a schema for parsing the raw data and using it as structured data (for example a connector dealing with protobuf data). The reason behind this might be that the raw data can be compressed better and would improve the performance of the pipeline. - +This scenario assumes the connector provides raw data but also provides a schema for parsing the raw data and using it as +structured data (for example a connector dealing with protobuf data). The reason behind this might be that the raw data +can be compressed better and would improve the performance of the pipeline. Example record that Conduit receives in this case: @@ -129,7 +146,9 @@ Example record that Conduit receives in this case: #### Scenario 4: Connector ships structured data with schema -In this scenario we have a pipeline that reads from a source that contains structured data (for example Postgres). The connector is aware of the structure so it extracts the schema and structures the data into an OpenCDC record that gets sent to Conduit for further processing. +In this scenario we have a pipeline that reads from a source that contains structured data (for example Postgres). The +connector is aware of the structure so it extracts the schema and structures the data into an OpenCDC record that gets +sent to Conduit for further processing. Example record that Conduit receives in this case: @@ -188,24 +207,32 @@ Example record that Conduit receives in this case: #### Parsing raw payload to OpenCDC record -In scenario 1 the connector ships raw key and payload data to Conduit without knowing the format of the message. It might be the case that the payload itself is an OpenCDC record. For that case Conduit should supply a "parser transform" that can parse the raw payload as an OpenCDC record and do one of two options: +In scenario 1 the connector ships raw key and payload data to Conduit without knowing the format of the message. It might +be the case that the payload itself is an OpenCDC record. For that case Conduit should supply a "parser transform" that +can parse the raw payload as an OpenCDC record and do one of two options: -- Use specific data from the parsed record to enrich the current OpenCDC record (e.g. extract payload and replace the current payload). The enriched record would be sent further down the pipeline. -- Replace the current OpenCDC record entirely by sending the parsed record further down the pipeline and discarding the original record. +- Use specific data from the parsed record to enrich the current OpenCDC record (e.g. extract payload and replace the + current payload). The enriched record would be sent further down the pipeline. +- Replace the current OpenCDC record entirely by sending the parsed record further down the pipeline and discarding the + original record. -Note that this transform could only operate on the key or payload data at a time. If both the key and payload contain an OpenCDC record, then two chained transforms would need to be used to parse both. +Note that this transform could only operate on the key or payload data at a time. If both the key and payload contain an +OpenCDC record, then two chained transforms would need to be used to parse both. Note also that we could provide such transforms not only for OpenCDC but also for other formats (e.g. Debezium records). ## Implementation options -The format that we choose and define here wouldn't be constrained by a specific representation like JSON or protobuf. We would need a protobuf definition of the record to be able to move it from the plugin to Conduit through gRPC, but the record could just as well be formatted as JSON or another format. +The format that we choose and define here wouldn't be constrained by a specific representation like JSON or protobuf. We +would need a protobuf definition of the record to be able to move it from the plugin to Conduit through gRPC, but the +record could just as well be formatted as JSON or another format. ### Option 1 We consider our needs, define OpenCDC from scratch and change our record to match OpenCDC. -Note that since this is the recommended option we go into more details about how the record should change if we go ahead with this option. +Note that since this is the recommended option we go into more details about how the record should change if we go ahead +with this option. #### Pros @@ -215,7 +242,7 @@ Note that since this is the recommended option we go into more details about how #### Cons -- We define a new format that nobody is using but us. https://xkcd.com/927/ +- We define a new format that nobody is using but us. - Breaking change for connectors, the record will change. - Debezium compatibility has to be achieved through transforms. @@ -225,58 +252,58 @@ We propose the following changes to the `Record` type: ```diff type Record struct { - // Position uniquely represents the record. - Position Position - // Metadata contains additional information regarding the record. - Metadata map[string]string -+ // Operation defines if this record contains data related to a create, -+ // update, delete or snapshot. -+ Operation Operation -- // CreatedAt represents the time when the change occurred in the source -- // system. If that's impossible to find out, then it should be the time the -- // change was detected by the connector. -- CreatedAt time.Time -- // Key represents a value that should be the same for records originating -- // from the same source entry (e.g. same DB row). In a destination Key will -- // never be null. -- Key Data -- // Payload holds the actual information that the record is transmitting. In -- // a destination Payload will never be null. -- Payload Data -+ // Before holds the key and payload data that was valid before the change. -+ // This field is only populated if the operation is update or delete. -+ Before struct { -+ // Key contains the data and schema of the key before the change. -+ Key DataWithSchema -+ // Payload contains the data and schema of the payload before the change. -+ Payload DataWithSchema -+ } -+ // After holds the key and payload data that is valid after the change. -+ // This field is only populated if the operation is create, update or snapshot. -+ After struct { -+ // Key contains the data and schema of the key after the change. -+ Key DataWithSchema -+ // Payload contains the data and schema of the payload after the change. -+ Payload DataWithSchema -+ } + // Position uniquely represents the record. + Position Position + // Metadata contains additional information regarding the record. + Metadata map[string]string ++ // Operation defines if this record contains data related to a create, ++ // update, delete or snapshot. ++ Operation Operation +- // CreatedAt represents the time when the change occurred in the source +- // system. If that's impossible to find out, then it should be the time the +- // change was detected by the connector. +- CreatedAt time.Time +- // Key represents a value that should be the same for records originating +- // from the same source entry (e.g. same DB row). In a destination Key will +- // never be null. +- Key Data +- // Payload holds the actual information that the record is transmitting. In +- // a destination Payload will never be null. +- Payload Data ++ // Before holds the key and payload data that was valid before the change. ++ // This field is only populated if the operation is update or delete. ++ Before struct { ++ // Key contains the data and schema of the key before the change. ++ Key DataWithSchema ++ // Payload contains the data and schema of the payload before the change. ++ Payload DataWithSchema ++ } ++ // After holds the key and payload data that is valid after the change. ++ // This field is only populated if the operation is create, update or snapshot. ++ After struct { ++ // Key contains the data and schema of the key after the change. ++ Key DataWithSchema ++ // Payload contains the data and schema of the payload after the change. ++ Payload DataWithSchema ++ } } + // Operation defines what operation triggered the creation of the record. + type Operation string + + const ( -+ OperationCreate Operation = "create" -+ OperationUpdate Operation = "update" -+ OperationDelete Operation = "delete" -+ OperationSnapshot Operation = "snapshot" ++ OperationCreate Operation = "create" ++ OperationUpdate Operation = "update" ++ OperationDelete Operation = "delete" ++ OperationSnapshot Operation = "snapshot" + ) + // DataWithSchema holds some data and a schema describing the data. + type DataWithSchema interface { -+ // Schema returns the schema describing Data or nil if none is set. -+ Schema() Schema -+ // Data returns raw or structured data or nil if none is set. -+ Data() Data ++ // Schema returns the schema describing Data or nil if none is set. ++ Schema() Schema ++ // Data returns raw or structured data or nil if none is set. ++ Data() Data + } ``` @@ -291,9 +318,12 @@ Notable changes: We propose to define a list of standard metadata fields, like: -- `opencdc.createdAt` - This was previously part of the record but would be moved to metadata as it is an optional field not critical to the operation of Conduit. It represents the time when the change occurred in the source system. If that's impossible to find out, then it should be empty. +- `opencdc.createdAt` - This was previously part of the record but would be moved to metadata as it is an optional field + not critical to the operation of Conduit. It represents the time when the change occurred in the source system. If + that's impossible to find out, then it should be empty. - `opencdc.readAt` - It represents the time the change was detected by the connector. -- `opencdc.chunk` - Signifies that this record is only a chunk of a bigger record (the behavior and actual content of this field is yet to be designed, that's a whole separate design document). +- `opencdc.chunk` - Signifies that this record is only a chunk of a bigger record (the behavior and actual content of + this field is yet to be designed, that's a whole separate design document). Conduit could define its own standard metadata fields that it populates automatically, like: @@ -301,7 +331,8 @@ Conduit could define its own standard metadata fields that it populates automati - `conduit.plugin.name` - Name of the Conduit plugin that produced this record. - `conduit.plugin.version` - Version of the Conduit plugin that produced this record. -More standard fields can be added. The SDK would provide an easy way of retrieving standard OpenCDC and Conduit metadata fields. +More standard fields can be added. The SDK would provide an easy way of retrieving standard OpenCDC and Conduit metadata +fields. #### Schema format @@ -311,7 +342,8 @@ It should be possible to attach a schema to a record key/payload for at least th - Protobuf - Avro -Additionally we might want to support an OpenCDC schema format that describes structured data (See [scenario 4](#scenario-4-connector-ships-structured-data-with-schema)). +Additionally we might want to support an OpenCDC schema format that describes structured data (See +[scenario 4](#scenario-4-connector-ships-structured-data-with-schema)). ### Option 2 @@ -327,12 +359,15 @@ OpenCDC gets defined the same as the Debezium record, we change our record to ma - Breaking change for connectors, the record will change. - We are tied to the Debezium format (less flexibility). -- The Debezium format contains a schema describing the record itself (i.e. fields that are part of the format itself), it is redundant. -- The Debezium format is tailored to databases, we could be dealing with any source that is not a database (schemaless raw data, schemaless structured data etc.). +- The Debezium format contains a schema describing the record itself (i.e. fields that are part of the format itself), it + is redundant. +- The Debezium format is tailored to databases, we could be dealing with any source that is not a database (schemaless + raw data, schemaless structured data etc.). ### Option 3 -We define OpenCDC by starting with our current record structure and extending it with additional fields to keep it backwards compatible. +We define OpenCDC by starting with our current record structure and extending it with additional fields to keep it +backwards compatible. #### Pros @@ -347,4 +382,5 @@ We define OpenCDC by starting with our current record structure and extending it ### Recommendation -Option 1 seems like the way to go and since Conduit is still a relatively new product now is the time to do such a change. Changing the format at a later stage when we have more connectors and users would be much more painful. +Option 1 seems like the way to go and since Conduit is still a relatively new product now is the time to do such a +change. Changing the format at a later stage when we have more connectors and users would be much more painful. diff --git a/docs/design-documents/20221024-stream-inspector.md b/docs/design-documents/20221024-stream-inspector.md index 1da58b18f..efe235dab 100644 --- a/docs/design-documents/20221024-stream-inspector.md +++ b/docs/design-documents/20221024-stream-inspector.md @@ -8,18 +8,18 @@ Make troubleshooting pipelines easier by making it possible to inspect the data 1. All pipeline components (accessible "from the outside", i.e. sources, processors and destinations) should be inspectable. - * For v1, it's enough that destinations are inspectable. + - For v1, it's enough that destinations are inspectable. 2. A source inspector should show the records coming out of the respective source. 3. A destination inspector should show the records coming into the respective destination. 4. A processor has two inspectors: one for the incoming records and one for the transformed records). -5. The inspector should provide the relevant records about the component being inspected, from the +5. The inspector should provide the relevant records about the component being inspected, from the time the inspection started. Historical data is not required. 6. The inspector should have a minimal footprint on the resource usage (CPU, memory etc.) 7. The inspector should have an insignificant impact on a pipeline's performance (ideally, no impact at all). 8. The inspector data should be available through: - * the HTTP API, - * the gRPC API and - * the UI. + - the HTTP API, + - the gRPC API and + - the UI. 9. Multiple users should be able to inspect the same pipeline component at the same time. Every user's session is independent. 10. The inspector should stop publishing the data to a client, if the client appears to be idle/crashed. @@ -29,38 +29,48 @@ In case a stream inspection is slower than the pipeline being inspected, it's po from the inspector. This is discussed more in the section "Blocking vs. non-blocking" below. ## Implementation + Here we discuss two aspects of the implementation: internals (i.e. how to actually get the records from the inspectable -pipeline components) and the API (i.e. how to deliver the inspected records to a client while providing a good user +pipeline components) and the API (i.e. how to deliver the inspected records to a client while providing a good user experience). ### Blocking vs. non-blocking + It's possible that a stream inspector won't be able to catch up with a pipeline, e.g. in the case of high velocity sources. To handle this, we have two options: + #### Option 1: Make the stream inspector blocking + In this option, the stream inspector would slow down the pipeline until it catches up. The goal is make sure all records are inspected. -**Advantages** +Advantages: + 1. Having complete data when troubleshooting. 2. This would make it possible to inject messages into a pipeline in the future. -**Disadvantages** +Disadvantages: + 1. Pipeline performance is affected. 2. The implementation becomes more complex. #### Option 2: Make the stream inspector non-blocking + In this option, the stream inspector would not slow the pipeline. Some records from the pipeline won't be shown in the stream inspector at all due to this. -**Advantages** +Advantages: + 1. Pipeline performance is not affected. 2. Simpler implementation. -**Disadvantages** +Disadvantages: + 1. Not having complete data when troubleshooting. #### Chosen option + The chosen option is a non-blocking stream inspector for following reasons: A blocking stream inspector would fall apart if we inspect a stream that's processing 10s of thousands of messages a @@ -74,156 +84,176 @@ bad data or bad transform) and allow them to easily write a processor that corre data back into the stream. ### Push based vs. pull based + Implementations will generally use one of two approaches: pull based and push based. -* In a **pull based** implementation, a client (HTTP client, gRPC client, UI) would be periodically checking for +- In a **pull based** implementation, a client (HTTP client, gRPC client, UI) would be periodically checking for inspection data. - * A client would need to store the "position" of the previous inspection, to get newer data. - * This would require inspection state management in Conduit for each session. - * Inspection data would come in with a delay. -* In a **push based** implementation, Conduit would be pushing the inspection data to a client (HTTP client, gRPC + - A client would need to store the "position" of the previous inspection, to get newer data. + - This would require inspection state management in Conduit for each session. + - Inspection data would come in with a delay. +- In a **push based** implementation, Conduit would be pushing the inspection data to a client (HTTP client, gRPC client, UI). - * A client would not need to store the "position" of the previous inspection, to get newer data. - * Inspection state management in Conduit would be minimal. - * Inspection data would come in almost real-time. - * Inspecting the data may require additional tools (depends on the actual implementation) + - A client would not need to store the "position" of the previous inspection, to get newer data. + - Inspection state management in Conduit would be minimal. + - Inspection data would come in almost real-time. + - Inspecting the data may require additional tools (depends on the actual implementation) -From what we know so far, **the push based approach has more advantages and is easier to work with, so that's the +From what we know so far, **the push based approach has more advantages and is easier to work with, so that's the approach chosen here**. Concrete implementation options are discussed below. For context: gRPC is the main API for Conduit. The HTTP API is generated using [grpc-gateway](https://github.com/grpc-ecosystem/grpc-gateway). ### API + Inspecting a pipeline component is triggered with a gRPC/HTTP API request. If the pipeline component cannot be inspected for any reason (e.g. inspection not supported, component not found), an error is returned. -For any given pipeline component, only one gRPC/HTTP API method is required, one which starts an inspection (i.e. +For any given pipeline component, only one gRPC/HTTP API method is required, one which starts an inspection (i.e. sending of data to a client). Three methods are required in total: + 1. One to inspect a connector (there's a single endpoint for source and destination connectors) 2. One to inspect records coming into a processor 3. One to inspect records coming out of a processor ### Delivery options + Here we'll discuss options for delivering the stream of inspection data from the gRPC/HTTP API to a client. #### Option 1: WebSockets + Conduit provides a streaming endpoint. The stream is exposed using the WebSockets API. -grpc-gateway doesn't support WebSockets (see [this](https://github.com/grpc-ecosystem/grpc-gateway/issues/168)). There's -an open-source proxy for it though, available [here](https://github.com/tmc/grpc-websocket-proxy/). +grpc-gateway doesn't support WebSockets (see [this](https://github.com/grpc-ecosystem/grpc-gateway/issues/168)). There's +an open-source proxy for it though, available [here](https://github.com/tmc/grpc-websocket-proxy/). #### Option 2: Pure gRPC + Conduit provides a streaming endpoint. The stream is consumed as such, i.e. a gRPC endpoint. A JavaScript implementation -of gRPC for browser clients exists (called [grpc-web](https://github.com/grpc/grpc-web)). While that would mean no +of gRPC for browser clients exists (called [grpc-web](https://github.com/grpc/grpc-web)). While that would mean no changes in Conduit itself, it would require a lot of changes in the UI. -#### Option 3: Server-sent events +#### Option 3: Server-sent events + [Server-sent events](https://html.spec.whatwg.org/#server-sent-events) enable servers to push events to clients. Unlike WebSockets, the communication is unidirectional. -While server-sent events generally match our requirements, the implementation would not be straightforward because -grpc-gateway doesn't support it, nor do they plan to support it (see [this](https://hackmd.io/@prysmaticlabs/eventstream-api) +While server-sent events generally match our requirements, the implementation would not be straightforward because +grpc-gateway doesn't support it, nor do they plan to support it (see [this](https://hackmd.io/@prysmaticlabs/eventstream-api) and [this](https://github.com/grpc-ecosystem/grpc-gateway/issues/26)). -Also, we want the inspector to be available through the UI, and using WebSockets is a much easier option than server-sent +Also, we want the inspector to be available through the UI, and using WebSockets is a much easier option than server-sent events. #### Chosen delivery option + [grpc-websocket-proxy](https://github.com/tmc/grpc-websocket-proxy/) mention in option 1 is relatively popular and is open-source, so using it is no risk. The other option is much costlier. ### Internals: Exposing the inspection data + Following options exist to expose the inspection data for node. How the data will be used by the API is discussed in below sections. #### Option 1: Connectors and processors expose a method + In this option, inspection would be performed at the connector or processor level. Inspectable pipeline components themselves would expose an `Inspect` method, for example: + ```go // Example for a source, can also be a processor or a destination func(s Source) Inspect(direction string) chan Record ``` + (As a return value, we may use a special `struct` instead of `chan Record` to more easily propagate events, such as inspection done.) -**Advantages**: +Advantages: + 1. The implementation would be relatively straightforward and not complex. -**Disadvantages**: +Disadvantages: + 1. Minor changes in the sources, processors and destinations are needed. #### Option 2: Dedicated inspector nodes + In this option, we'd have a node which would be dedicated for inspecting data coming in and out of a source, processor or destination node. To inspect a node we would dynamically add a node before or after a node being inspected. -**Advantages**: +Advantages: + 1. All the code related to inspecting nodes would be "concentrated" in one or two node types. 2. Existing nodes don't need to be changed. 3. This makes it possible to inspect any node. -4. Solving this problem would put us into a good position to solve https://github.com/ConduitIO/conduit/issues/201. +4. Solving this problem would put us into a good position to solve . + +Disadvantages: -**Disadvantages** 1. Currently, it's not possible to dynamically add a node, which would mean that we need to restart a pipeline to do this, and that's not a good user experience. 2. On the other hand, changing the code so that a pipeline's topology can be dynamically changed is a relatively large amount of work. -#### Option 3: Add the inspection code to `PubNode`s and `SubNode`s. +#### Option 3: Add the inspection code to `PubNode`s and `SubNode`s + +Advantages: -**Advantages** 1. Solves the problem for all nodes. While we're not necessarily interested in all the nodes, solving the problem at the `PubNode` level solves the problem for sources and output records for processors at the same time, and solving the problem at the `SubNode` level solves the problem for destinations and input records for processors at the same time. -**Disadvantages** +Disadvantages: + 1. Adding inspection to pub and sub nodes is complex. This complexity is reflected in following: -* Once we add a method to the `PubNode` and `SubNode` interfaces, we'll need to implement it in all current - implementations, even if that means only calling a method from an embedded type. -* `PubNode` and `SubNode` rely on Go channels to publish/subscribe to messages. Automatically sending messages from - those channels to registered inspectors is non-trivial. + - Once we add a method to the `PubNode` and `SubNode` interfaces, we'll need to implement it in all current + implementations, even if that means only calling a method from an embedded type. + - `PubNode` and `SubNode` rely on Go channels to publish/subscribe to messages. Automatically sending messages from + those channels to registered inspectors is non-trivial. #### Chosen implementation -Option 1, i.e. connectors and processor exposing methods to inspect themselves, is the preferred option given that + +Option 1, i.e. connectors and processor exposing methods to inspect themselves, is the preferred option given that options 2 and 3 are relatively complex, and we would risk delivering this feature in scope of the 0.4 release. ## Questions -* Should we rename it to pipeline inspector instead? Pipeline is the term we use in the API, streams are used +- Should we rename it to pipeline inspector instead? Pipeline is the term we use in the API, streams are used internally. - * **Answer**: Since the inspector is about inspecting data, and the term stream stands for data whereas pipeline - stands for the setup/topology, keeping the term "stream inspector" makes sense. -* Is metadata needed (such as time the records were captured)? - * Examples: - * The time at which a record entered/left a node. - * ~~The source connector from which a record originated.~~ Can be found in OpenCDC metadata. -* Should there be a limit on how long a stream inspector can run? -* Should there be a limit on how many records a stream inspector can receive? - * Pros: - * Users could mistakenly leave inspector open "forever". - * This would handle the case where an inspector crashes. - * Cons: - * Some users may want to run an inspection over a longer period of time. - * The inspector will be non-blocking, not accumulating any data in memory, so it running over a longer period of time + - **Answer**: Since the inspector is about inspecting data, and the term stream stands for data whereas pipeline + stands for the setup/topology, keeping the term "stream inspector" makes sense. +- Is metadata needed (such as time the records were captured)? + - Examples: + - The time at which a record entered/left a node. + - ~~The source connector from which a record originated.~~ Can be found in OpenCDC metadata. +- Should there be a limit on how long a stream inspector can run? +- Should there be a limit on how many records a stream inspector can receive? + - Pros: + - Users could mistakenly leave inspector open "forever". + - This would handle the case where an inspector crashes. + - Cons: + - Some users may want to run an inspection over a longer period of time. + - The inspector will be non-blocking, not accumulating any data in memory, so it running over a longer period of time won't consume significant resources. - * **Answer**: For now, we'll have no limits. -* Are we interested in more than the records? Is there some other data we'd like to see (now or in future)? - * **Answer**: We didn't find any data not related to records themselves which would be useful in the inspector. -* Should it be possible to specify which data is shown? - * **Answer**: No, out of scope. It's a matter of data representation on the client side. -* Should the inspector be blocking (if a client is consuming the inspected records slower that then pipeline rate, + - **Answer**: For now, we'll have no limits. +- Are we interested in more than the records? Is there some other data we'd like to see (now or in future)? + - **Answer**: We didn't find any data not related to records themselves which would be useful in the inspector. +- Should it be possible to specify which data is shown? + - **Answer**: No, out of scope. It's a matter of data representation on the client side. +- Should the inspector be blocking (if a client is consuming the inspected records slower that then pipeline rate, then the pipeline would be throttled) or non-blocking? - * Arguments for blocking: - * the inspected data will be complete no matter what, making troubleshooting easier - * if we make it possible to insert records during inspection, it will make implementation easier - * Arguments against: - * "inspection" is meant to be "view-only" and not a debugger - * for high-volume sources this will always result in pipeline being throttled - * **Answer**: Based on above, the stream inspector should be non-blocking. + - Arguments for blocking: + - the inspected data will be complete no matter what, making troubleshooting easier + - if we make it possible to insert records during inspection, it will make implementation easier + - Arguments against: + - "inspection" is meant to be "view-only" and not a debugger + - for high-volume sources this will always result in pipeline being throttled + - **Answer**: Based on above, the stream inspector should be non-blocking. ## Future work -* Inspector REPL: https://github.com/ConduitIO/conduit/issues/697 \ No newline at end of file +- Inspector REPL: diff --git a/docs/design-documents/20221027-builtin-config-validation.md b/docs/design-documents/20221027-builtin-config-validation.md index 192bebaaa..bd1023037 100644 --- a/docs/design-documents/20221027-builtin-config-validation.md +++ b/docs/design-documents/20221027-builtin-config-validation.md @@ -88,21 +88,23 @@ are not exposed by the SDK. Implementing the validation options provided by the proto file. So, giving the developer the option to specify validations for each parameter and Conduit will make sure to run the validations. -Validating the type of the parameter, we have 6 types supported in the proto design {string, int, float, bool, file, and duration} +Validating the type of the parameter, we have 6 types supported in the proto design {string, int, float, bool, file, and +duration}. -Validate that the config doesn't contain a parameter that is not defined in the specifications, which will help detect +Validate that the config doesn't contain a parameter that is not defined in the specifications, which will help detect a typo in a pipeline configuration file, or an extra configuration that does not exist. -Providing a utility function to generate the config map for the `Parameters` function from a config struct. This is not -mandatory for the scope of this feature, but it would be a nice to have and would make the developing experience for connectors easier. +Providing a utility function to generate the config map for the `Parameters` function from a config struct. This is not +mandatory for the scope of this feature, but it would be a nice to have and would make the developing experience for +connectors easier. -## Questions: +## Questions **Q**: Should we implement the validations from the SDK or Conduit side? **A**: If we implement it from Conduit side, this means we can add more validations in the future without changing the SDK. -However, implementing it from the SDK side means that all the validations will happen on the connector side, using the -Config function, and the developer will still have the ability to add custom validations for some specific cases that +However, implementing it from the SDK side means that all the validations will happen on the connector side, using the +Config function, and the developer will still have the ability to add custom validations for some specific cases that the SDK wouldn't cover. So we decided on implementing validating from the SDK side. **Q**: Should generating the Configurations from a Go struct be in the scope of this feature? @@ -112,7 +114,7 @@ would be super helpful for developers and easier to use, and will continue addin **Q**: How should the UI execute the validations? -**A**: The UI will execute the builtlin validations for each configuration while the user is typing (validations +**A**: The UI will execute the builtlin validations for each configuration while the user is typing (validations are in the same struct that has the list of the fields). When the user submits the form, the UI will try and create the connector, which will execute both the builtin and the custom validation. Finally, the UI will show an error for the user if an error occurs while creating the connector. @@ -120,35 +122,36 @@ error for the user if an error occurs while creating the connector. ## Implementation Implementing this feature consists of four main steps: -1. Adjust the connector protocol by adding validations for parameters, this change should be done in a backwards + +1. Adjust the connector protocol by adding validations for parameters, this change should be done in a backwards compatible way, so the old `required` field needs to be parsed into a validation. - 2. Adjust the connector SDK to give developers the ability to specify validations needed for each parameter. (manually) - -Params should look something like: - -```go -SourceParams: []sdk.Parameter{ - { - Name: "param", - Type: sdk.ParameterTypeInt, - Validations: []sdk.Validation{ - sdk.ValidationRequired{}, - sdk.ValidationLessThan{Value:8}, - } - } - } -``` + + Params should look something like: + + ```go + sourceParams: []sdk.Parameter{ + { + Name: "param", + Type: sdk.ParameterTypeInt, + Validations: []sdk.Validation{ + sdk.ValidationRequired{}, + sdk.ValidationLessThan{Value:8}, + }, + } + } + ``` 3. Provide a function that takes the parameters' validations and validates them in the `Configure` function on the SDK. -4. Generate connector configurations from a Go struct, which will give the ability to generate the connector's - configurations from a Go struct, the struct would have field tags that specify validations, default value, and if +4. Generate connector configurations from a Go struct, which will give the ability to generate the connector's + configurations from a Go struct, the struct would have field tags that specify validations, default value, and if a parameter is required. - example: -```go -type Config struct { - param1 string `validate:"greater-than:0" required:"true"` - param2 string `validate:"less-than:100" default:"10"` -} -``` \ No newline at end of file + Example: + + ```go + type Config struct { + param1 string `validate:"greater-than:0" required:"true"` + param2 string `validate:"less-than:100" default:"10"` + } + ``` diff --git a/docs/health_check.md b/docs/health_check.md index 873f60904..1b7a3e628 100644 --- a/docs/health_check.md +++ b/docs/health_check.md @@ -1,7 +1,7 @@ # Health check -Conduit’s health check can be used to determine if Conduit is running correctly. What it does is to check if Conduit -can successfully connect to the database which it was configured with (which can be BadgerDB, PostgreSQL or the +Conduit’s health check can be used to determine if Conduit is running correctly. What it does is to check if Conduit +can successfully connect to the database which it was configured with (which can be BadgerDB, PostgreSQL or the in-memory one). The health check is available at the `/healthz` path. Here’s an example: ```bash @@ -16,5 +16,5 @@ $ curl "http://localhost:8080/healthz?service=PipelineService" {"status":"SERVING"} ``` -The services which can be checked for health are: `PipelineService`, `ConnectorService`, `ProcessorService`, and +The services which can be checked for health are: `PipelineService`, `ConnectorService`, `ProcessorService`, and `PluginService`. diff --git a/docs/metrics.md b/docs/metrics.md index 4c16cbec6..a428219f1 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -14,9 +14,9 @@ locally, you can get metrics if you run `curl localhost:8080/metrics`. ## Available metrics -* **Conduit metrics**: We currently have a number of high level pipeline, +- **Conduit metrics**: We currently have a number of high level pipeline, processor and connector metrics, all of which are defined - in [measure.go](https://github.com/ConduitIO/conduit/blob/main/pkg/foundation/metrics/measure/measure.go). + in [`measure.go`](https://github.com/ConduitIO/conduit/blob/main/pkg/foundation/metrics/measure/measure.go). Those are: | Pipeline name | Type | Description | @@ -34,19 +34,19 @@ locally, you can get metrics if you run `curl localhost:8080/metrics`. \*We calculate bytes based on the JSON representation of the record payload and key. -* **Go runtime metrics**: The default metrics exposed by Prometheus' official Go +- **Go runtime metrics**: The default metrics exposed by Prometheus' official Go package, [client_golang](https://pkg.go.dev/github.com/prometheus/client_golang). -* **gRPC metrics**: The gRPC instrumentation package we use +- **gRPC metrics**: The gRPC instrumentation package we use is [promgrpc](https://github.com/piotrkowalczuk/promgrpc). The metrics exposed are listed [here](https://github.com/piotrkowalczuk/promgrpc#metrics). -* **HTTP API metrics**: We +- **HTTP API metrics**: We use [promhttp](https://pkg.go.dev/github.com/prometheus/client_golang/prometheus/promhttp), Prometheus' official package for instrumentation of HTTP servers. ## Adding new metrics Currently, we have a number of metric types already defined -in [metrics.go](https://github.com/ConduitIO/conduit/blob/main/pkg/pipeline/stream/metrics.go). +in [`metrics.go`](https://github.com/ConduitIO/conduit/blob/main/pkg/pipeline/stream/metrics.go). Those are: counter, gauge, timer and histogram and their "labeled" versions too. A labeled metric is one where labels must be set before usage. In many cases, the already present metric types should be sufficient. @@ -58,7 +58,7 @@ counter and increase the counter in source nodes, each time a message is read. ### Create a new labeled counter To do so, add the following code -to [measure.go](https://github.com/ConduitIO/conduit/blob/main/pkg/foundation/metrics/measure/measure.go). +to [`measure.go`](https://github.com/ConduitIO/conduit/blob/main/pkg/foundation/metrics/measure/measure.go). ```go PipelineMsgMetrics = metrics.NewLabeledCounter( @@ -70,9 +70,9 @@ PipelineMsgMetrics = metrics.NewLabeledCounter( The labeled counter created here: -* has the name `conduit_pipeline_msg_counter`, -* has the description `Number of messages per pipeline.`, -* accepts a `pipeline_name` label. +- has the name `conduit_pipeline_msg_counter`, +- has the description `Number of messages per pipeline.`, +- accepts a `pipeline_name` label. ### Instantiate a counter with a label @@ -83,7 +83,7 @@ in our case). In other words, for each pipeline we will have a separate counter (for which the `pipeline_name` label is set to the pipeline name). To do so, when building a source node -in [lifecycle.go](https://github.com/ConduitIO/conduit/blob/main/pkg/pipeline/lifecycle.go), +in [`lifecycle.go`](https://github.com/ConduitIO/conduit/blob/main/pkg/pipeline/lifecycle.go), we can add the following: ```go @@ -111,7 +111,7 @@ Assuming you have a pipeline running locally, you can execute `curl -Ss localhost:8080/metrics | grep conduit_pipeline_msg_counter` to check your newly created metrics. You will see something along the lines of: -``` +```text # HELP conduit_pipeline_msg_counter Number of messages per pipeline. # TYPE conduit_pipeline_msg_counter counter conduit_pipeline_msg_counter{pipeline_name="my-new-pipeline"} 84 diff --git a/docs/pipeline_configuration_files.md b/docs/pipeline_configuration_files.md index d58d41554..9565d736f 100644 --- a/docs/pipeline_configuration_files.md +++ b/docs/pipeline_configuration_files.md @@ -1,24 +1,31 @@ -## Pipeline Configuration Files +# Pipeline Configuration Files + Pipeline configuration files give you the ability to define pipelines that are provisioned by Conduit at startup. It's as simple as creating a YAML file that defines pipelines, connectors, processors, and their corresponding configurations. ## Getting started -Create a folder called `pipelines` at the same level as your Conduit binary file, add all your YAML files -there, then run Conduit using the command: -``` + +Create a folder called `pipelines` at the same level as your Conduit binary file, add all your YAML files +there, then run Conduit using the command: + +```sh ./conduit ``` -Conduit will only search for files with `.yml` or `.yaml` extensions, recursively in all sub-folders. -If you have your YAML files in a different directory, or want to provision only one file, then simply run Conduit with +Conduit will only search for files with `.yml` or `.yaml` extensions, recursively in all sub-folders. + +If you have your YAML files in a different directory, or want to provision only one file, then simply run Conduit with the CLI flag `pipelines.path` and point to your file or directory: -``` + +```sh ./conduit -pipeline.path ../my-directory ``` + If your directory does not exist, Conduit will fail with an error: `"pipelines.path" config value is invalid` ### YAML Schema -The file in general has two root keys, the `version`, and the `pipelines` map. The map consists of other elements like + +The file in general has two root keys, the `version`, and the `pipelines` map. The map consists of other elements like `status` and `name`, which are configurations for the pipeline itself. To create connectors in that pipeline, simply add another map under the pipeline map, and call it `connectors`. @@ -26,7 +33,7 @@ To create connectors in that pipeline, simply add another map under the pipeline To create processors, either add a `processors` map under a pipeline ID, or under a connector ID, depending on its parent. Check this YAML file example with explanation for each field: -``` yaml +```yaml version: 1.0 # parser version, the only supported version for now is 1.0 [mandatory] pipelines: # a map of pipelines IDs and their configurations. @@ -62,32 +69,32 @@ pipelines: # a map of pipelines IDs and their configuration If the file is invalid (missed a mandatory field, or has an invalid configuration value), then the pipeline that has the invalid value will be skipped, with an error message logged. -If two pipelines in one file have the same ID, or the `version` field was not specified, then the file would be +If two pipelines in one file have the same ID, or the `version` field was not specified, then the file would be non-parsable and will be skipped with an error message logged. If two pipelines from different files have the same ID, the second pipeline will be skipped, with an error message specifying which pipeline was not provisioned. -**_Note_**: Connector IDs and processor IDs will get their parent ID prefixed, so if you specify a connector ID as `con1` -and its parent is `pipeline1`, then the provisioned connector will have the ID `pipeline1:con1`. Same goes for processors, -if the processor has a pipeline parent, then the processor ID will be `connectorID:processorID`, and if a processor -has a connector parent, then the processor ID will be `pipelineID:connectorID:processorID`. +**_Note_**: Connector IDs and processor IDs will get their parent ID prefixed, so if you specify a connector ID as `con1` +and its parent is `pipeline1`, then the provisioned connector will have the ID `pipeline1:con1`. Same goes for processors, +if the processor has a pipeline parent, then the processor ID will be `connectorID:processorID`, and if a processor +has a connector parent, then the processor ID will be `pipelineID:connectorID:processorID`. ## Pipelines Immutability -Pipelines provisioned by configuration files are **immutable**, any updates needed on a provisioned pipeline have to be -done through the configuration file it was provisioned from. You can only control stopping and starting a pipeline + +Pipelines provisioned by configuration files are **immutable**, any updates needed on a provisioned pipeline have to be +done through the configuration file it was provisioned from. You can only control stopping and starting a pipeline through the UI or API. ### Updates and Deletes + Updates and deletes for a pipeline provisioned by configuration files can only be done through the configuration files. Changes should be made to the files, then Conduit has to be restarted to reload the changes. Any updates or deletes done through the API or UI will be prohibited. -* To delete a pipeline: simply delete it from the `pipelines` map from the configuration file, then run conduit again. -* To update a pipeline: change any field value from the configuration file, and run conduit again to address these updates. +- To delete a pipeline: simply delete it from the `pipelines` map from the configuration file, then run Conduit again. +- To update a pipeline: change any field value from the configuration file, and run Conduit again to address these updates. Updates will preserve the status of the pipeline, and will continue working from where it stopped. However, the pipeline will start from the beginning of the source and will not continue from where it stopped, if one of these values were updated: {`pipeline ID`, `connector ID`, `connector plugin`, `connector type`}. - - diff --git a/docs/pipeline_semantics.md b/docs/pipeline_semantics.md index 04b1e3503..de14c1114 100644 --- a/docs/pipeline_semantics.md +++ b/docs/pipeline_semantics.md @@ -28,12 +28,12 @@ In the diagram above we see 7 sections: sends them into one output channel. The order of messages coming from different connectors is nondeterministic. A fan-in node is automatically created for all pipelines. - **Pipeline processors** - these processors receive all messages that flow through the pipeline, regardless of the - source or destination. Pipeline processors are created by specifying the pipeline as the parent entity. Pipeline processors + source or destination. Pipeline processors are created by specifying the pipeline as the parent entity. Pipeline processors are not required for starting a pipeline. - **Fan-out node** - this node is the counterpart to the fan-in node and acts as a demultiplexer that sends messages coming from a single input to multiple downstream nodes (one for each destination). The fan-out node does not buffer messages, instead, it waits for a message to be sent to all downstream nodes before processing the next message (see - [backpressure](#Backpressure)). A fan-out node is automatically created for all pipelines. + [backpressure](#backpressure)). A fan-out node is automatically created for all pipelines. - **Destination processors** - these processors receive only messages that are meant to be sent to a specific destination connector. Destination processors are created by specifying the corresponding destination connector as the parent entity. Destination processors are not required for starting a pipeline. @@ -54,7 +54,7 @@ are created in source nodes when they receive records from the source connector, between nodes until they are acked or nacked. Nodes are only allowed to hold a reference to a single message at a time, meaning that they need to pass the message to the next node before taking another message¹. This also means there is no explicit buffer in Conduit, a pipeline can only hold only as many messages as there are nodes in the pipeline (see -[backpressure](#Backpressure) for more information). +[backpressure](#backpressure) for more information). ¹This might change in the future if we decide to add support for multi-message transforms. @@ -72,7 +72,7 @@ A message can be in one of 3 states: - **Nacked** - the processing of the message failed and resulted in an error, so the message was negatively acknowledged. This can be done either by a processor (e.g. a transform failed) or by a destination. If a pipeline contains multiple destinations, the message needs to be negatively acknowledged by at least one destination before it - is marked as nacked. When a message is nacked, the message is passed to the [DLQ](#Dead-letter-queue) handler, which + is marked as nacked. When a message is nacked, the message is passed to the [DLQ](#dead-letter-queue) handler, which essentially controls what happens after a message is nacked (stop pipeline, drop message and continue running or store message in DLQ and continue running). @@ -119,7 +119,7 @@ multiple destinations that stopped because of a negatively acknowledged record, negatively acknowledged a record and processed more messages after that. For this reason, we strongly recommend implementing the write operation of a destination connector in an idempotent way (if possible). -The delivery guarantee can be changed to "at most once" by adding a [dead letter queue](#Dead-letter-queue) handler that +The delivery guarantee can be changed to "at most once" by adding a [dead letter queue](#dead-letter-queue) handler that drops unsuccessfully processed messages. ### Acks are delivered in order diff --git a/docs/processors.md b/docs/processors.md index a30a416c3..b4f442697 100644 --- a/docs/processors.md +++ b/docs/processors.md @@ -1,15 +1,16 @@ -## Processors +# Processors A processor is a component that operates on a single record that flows through a pipeline. It can either change the record -(i.e. **transform** it) or **filter** it out based on some criteria. Since they are part of pipelines, making yourself +(i.e. **transform** it) or **filter** it out based on some criteria. Since they are part of pipelines, making yourself familiar with [pipeline semantics](/docs/pipeline_semantics.md) is highly recommended. ![Pipeline](data/pipeline_example.svg) -Processors are **optional** components in a pipeline, i.e. a pipeline can be started without them. They are always attached -to a single parent, which can be either a connector or a pipeline. With that, we can say that we have the following types +Processors are **optional** components in a pipeline, i.e. a pipeline can be started without them. They are always attached +to a single parent, which can be either a connector or a pipeline. With that, we can say that we have the following types of processors: -1. **Source processors**: these processors only receive messages originating at a specific source connector. Source + +1. **Source processors**: these processors only receive messages originating at a specific source connector. Source processors are created by specifying the corresponding source connector as the parent entity. 2. **Pipeline processors**: these processors receive all messages that flow through the pipeline, regardless of the source or destination. Pipeline processors are created by specifying the pipeline as the parent entity. @@ -17,18 +18,19 @@ of processors: destination connector. Destination processors are created by specifying the corresponding destination connector as the parent entity. -Given that every processor can have one (and only one) parent, processors cannot be shared. In case the same processing +Given that every processor can have one (and only one) parent, processors cannot be shared. In case the same processing needs to happen for different sources or destinations, you have two options: + 1. If records from all sources (or all destinations) need to be processed in the same way, then you can create a pipeline processor -2. If records from some, but not all, sources (or destinations) need to be processed in the same way, then you need to +2. If records from some, but not all, sources (or destinations) need to be processed in the same way, then you need to create multiple processors (one for each source or destination) and configure them in the same way. ## Adding and configuring a processor Processors are created through the `/processors` endpoint. Here's an example: -```json lines +```json POST /v1/processors { // name of the processor in Conduit @@ -51,27 +53,28 @@ POST /v1/processors } } ``` + The request to create a processor is described in [api.swagger.json](/pkg/web/openapi/swagger-ui/api/v1/api.swagger.json). ## Supported processors -Conduit provides a number of built-in processors, such as filtering fields, replacing them, posting payloads to HTTP endpoints etc. -Conduit also provides the ability to write custom processors in JavaScript. +Conduit provides a number of built-in processors, such as filtering fields, replacing them, posting payloads to HTTP +endpoints etc. Conduit also provides the ability to write custom processors in JavaScript. ### Built-in processors An up-to-date list of all built-in processors and detailed descriptions can be found [here](https://pkg.go.dev/github.com/conduitio/conduit/pkg/processor/procbuiltin). -Note that all built-in processors that operate on the payload actually operate on `Record.Payload.After`. If you need to manipulate -the field `Record.Payload.Before` you can use a [JavaScript processor](#javascript-processors). +Note that all built-in processors that operate on the payload actually operate on `Record.Payload.After`. If you need to +manipulate the field `Record.Payload.Before` you can use a [JavaScript processor](#javascript-processors). An example is available in [extract-field-transform.sh](/examples/processors/extract-field-transform.sh). The script will set up a pipeline with the built-in extract-field processors. ### JavaScript processors -JavaScript processors make it possible to write custom processors in JavaScript. The API name for JavaScript processors -(used in the request to create a processor) is `js`. There's only one configuration parameter, `script`, which is the -script itself. To find out what's possible with the JS processors, also refer to the documentation for [goja](https://github.com/dop251/goja), +JavaScript processors make it possible to write custom processors in JavaScript. The API name for JavaScript processors +(used in the request to create a processor) is `js`. There's only one configuration parameter, `script`, which is the +script itself. To find out what's possible with the JS processors, also refer to the documentation for [goja](https://github.com/dop251/goja), which is the JavaScript engine we use. Here's an example of a request payload to create a JavaScript processor: @@ -90,8 +93,10 @@ Here's an example of a request payload to create a JavaScript processor: } } ``` -The above will create a JavaScript processor (`"name": "js"`), attached to a connector (for the parent, we have + +The above will create a JavaScript processor (`"name": "js"`), attached to a connector (for the parent, we have `"type": "TYPE_CONNECTOR"`). The script used is: + ```javascript function process(record) { record.Metadata["foo-key"] = "foo-value"; @@ -100,11 +105,13 @@ function process(record) { ``` The script needs to define a function called `process`, which accepts an `sdk.Record`, and returns: -* an `sdk.Record`, in case you want to transform the record, -* `null`, in case you want to drop the record from the pipeline. -The above example request transforms a record, by "enriching" its metadata (it adds a metadata key). Following is an +- an `sdk.Record`, in case you want to transform the record, +- `null`, in case you want to drop the record from the pipeline. + +The above example request transforms a record, by "enriching" its metadata (it adds a metadata key). Following is an example where we also filter records: + ```javascript function process(r) { // if the record metadata has a "keepme" key set @@ -118,6 +125,7 @@ function process(r) { ``` The script is not constrained to having only this function, i.e. you can have something like this: + ```javascript function doSomething(record) { // do something with the record @@ -131,14 +139,16 @@ function process(record) { ``` Conduit also provides a number of helper objects and methods which can be used in the JS code. Those are, currently: -1. `logger` - a `zerolog.Logger` which writes to the Conduit server logs. You can use it in the same way you would use + +1. `logger` - a `zerolog.Logger` which writes to the Conduit server logs. You can use it in the same way you would use it in Go code, i.e. you can write this for example: `logger.Info().Msgf("hello, %v!", "world")` 2. `Record()` - constructs a `record.Record`. 3. `RawData()` - constructs `record.RawData`. 4. `StructuredData()` - constructs `record.StructuredData`. -Following is an example of a JavaScript processor, where we transform a record and utilize a number of tools mentioned +Following is an example of a JavaScript processor, where we transform a record and utilize a number of tools mentioned above: + ```javascript // Parses the record payload as JSON function parseAsJSON(record) { @@ -165,13 +175,14 @@ function process(record) { ``` ## Inspecting a processor -Records entering a processor and the resulting records can be inspected using -[Conduit's stream inspector](/docs/design-documents/20221024-stream-inspector.md). This makes it possible to "debug" a + +Records entering a processor and the resulting records can be inspected using +[Conduit's stream inspector](/docs/design-documents/20221024-stream-inspector.md). This makes it possible to "debug" a processor. -The records are made available via a WebSocket connection. To inspect input records, the following endpoint needs to be -used: `ws://host:port/v1/processors/{processor-id}/inspect-in`. Similarly, to inspect output records, the following +The records are made available via a WebSocket connection. To inspect input records, the following endpoint needs to be +used: `ws://host:port/v1/processors/{processor-id}/inspect-in`. Similarly, to inspect output records, the following endpoint needs to be used: `ws://host:port/v1/processors/{processor-id}/inspect-out`. -For example, if you're running Conduit locally with the default settings, and have a processor called `format-lines`, +For example, if you're running Conduit locally with the default settings, and have a processor called `format-lines`, then you would use the following endpoint: `ws://localhost:8080/v1/processors/format-lines/inspect-out`. diff --git a/docs/releases.md b/docs/releases.md index 9f86e0389..8b6ab8105 100644 --- a/docs/releases.md +++ b/docs/releases.md @@ -1,34 +1,43 @@ -### General information +# Releases + +## General information + A Conduit release has the following parts: -* a GitHub release, which further includes - * packages for different operating systems and architectures - * a file with checksums for the packages - * a changelog - * the source code -* a GitHub package, which is the official Docker image for Conduit. It's available on GitHub's Container Registry. The + +- a GitHub release, which further includes + - packages for different operating systems and architectures + - a file with checksums for the packages + - a changelog + - the source code +- a GitHub package, which is the official Docker image for Conduit. It's available on GitHub's Container Registry. The latest Docker image which is not a nightly is tagged with `latest`. -### How to release a new version -A release is triggered by pushing a new tag which starts with `v` (for example `v1.2.3`). Everything else is then handled by -GoReleaser and GitHub actions. To push a new tag, please use the script [scripts/tag.sh](https://github.com/ConduitIO/conduit/blob/main/scripts/tag.sh), +## How to release a new version + +A release is triggered by pushing a new tag which starts with `v` (for example `v1.2.3`). Everything else is then +handled by GoReleaser and GitHub actions. To push a new tag, please use the script [scripts/tag.sh](https://github.com/ConduitIO/conduit/blob/main/scripts/tag.sh), which also checks if the version conforms to SemVer. Example: -``` + +```sh scripts/tag.sh 1.2.3 ``` -### Nightly builds +## Nightly builds + We provide nightly builds (binaries and Docker images) and keep them for 7 days. The latest nightly Docker image is tagged with `latest-nightly`. -### Implementation -The GitHub release is created with [GoReleaser](https://github.com/goreleaser/goreleaser/). GoReleaser _can_ build Docker images, -but we're building those "ourselves" (using Docker's official GitHub actions), since GoReleaser doesn't work with multi-stage -Docker builds. +## Implementation + +The GitHub release is created with [GoReleaser](https://github.com/goreleaser/goreleaser/). GoReleaser _can_ build +Docker images, but we're building those "ourselves" (using Docker's official GitHub actions), since GoReleaser doesn't +work with multi-stage Docker builds. Nightly builds are created in the same way, it's only the triggering which is different. Namely, we have a GitHub action (defined in [trigger-nightly.yml](/.github/workflows/trigger-nightly.yml)) which is creating nightly tags once in 24 hours. -A new nightly tag then triggers a new release. The mentioned GitHub action also cleans up older tags, releases and Docker images. +A new nightly tag then triggers a new release. The mentioned GitHub action also cleans up older tags, releases and +Docker images. -The "Trigger nightly build" GH action requires a personal access token, and _not_ a GitHub token provided by Actions. The +The "Trigger nightly build" GH action requires a personal access token, and _not_ a GitHub token provided by Actions. The reason is that a workflow which produces an event using a GitHub token cannot trigger another workflow through that event. For more information, please check [Triggering a workflow from a workflow](https://docs.github.com/en/actions/using-workflows/triggering-a-workflow#triggering-a-workflow-from-a-workflow). diff --git a/githooks/pre-push b/githooks/pre-push index 66334c989..7f679f315 100755 --- a/githooks/pre-push +++ b/githooks/pre-push @@ -2,4 +2,5 @@ EXIT_STATUS=0 make test || EXIT_STATUS=$? golangci-lint run || EXIT_STATUS=$? +make markdown-lint || EXIT_STATUS=$? exit $EXIT_STATUS diff --git a/proto/README.md b/proto/README.md index d6b8e70cf..2dcfcbfcb 100644 --- a/proto/README.md +++ b/proto/README.md @@ -6,16 +6,18 @@ the [grpc-gateway](https://github.com/grpc-ecosystem/grpc-gateway). ## Client code -The client code for Conduit's API is available remotely generated via -[Buf's Remote Generation](https://docs.buf.build/bsr/remote-generation/overview). Remote code generation is triggered +The client code for Conduit's API is available remotely generated via +[Buf's Remote Generation](https://docs.buf.build/bsr/remote-generation/overview). Remote code generation is triggered via a GitHub workflow defined [here](/.github/workflows/buf.yml). To use the client code, firstly run: + ```shell go get go.buf.build/conduitio/conduit/conduitio/conduit ``` Here's an example usage of Conduit's client code: + ```go package main diff --git a/ui/README.md b/ui/README.md index 22db3e40e..1c31ef92c 100644 --- a/ui/README.md +++ b/ui/README.md @@ -4,61 +4,66 @@ Conduit UI is the web front-end for Conduit built with [Ember](https://emberjs.c ## Architecture -The UI application is a standard Ember application that is simply rooted in the `ui` directory of the Conduit project. The UI can then be built and embedded into Conduit's binary during a Conduit build. +The UI application is a standard Ember application that is simply rooted in the `ui` directory of the Conduit project. +The UI can then be built and embedded into Conduit's binary during a Conduit build. ## Prerequisites You will need the following things properly installed on your computer. -* [Git](https://git-scm.com/) -* [Node.js](https://nodejs.org/) -* [Yarn](https://yarnpkg.com/) -* [Ember CLI](https://ember-cli.com/) +- [Git](https://git-scm.com/) +- [Node.js](https://nodejs.org/) +- [Yarn](https://yarnpkg.com/) +- [Ember CLI](https://ember-cli.com/) ## Installation -* `git clone git@github.com:ConduitIO/conduit.git` the conduit repository -* `cd conduit` -* `make ui-dependencies` +- `git clone git@github.com:ConduitIO/conduit.git` the Conduit repository +- `cd conduit` +- `make ui-dependencies` -_Note:_ Commands in this readme are run from the Conduit project root. Alternatively, you can change into the `ui/` directory to directly use Yarn, [Ember CLI](https://ember-cli.com/), or non-prefixed [UI Makefile](Makefile) commands +_Note:_ Commands in this readme are run from the Conduit project root. Alternatively, you can change into the `ui/` +directory to directly use Yarn, [Ember CLI](https://ember-cli.com/), or non-prefixed [UI Makefile](Makefile) commands. ## Running / Development Before running Conduit UI, you must make sure the Conduit server is running -* `make run` +- `make run` Alternatively, if you'd like to develop the UI against the Conduit server binary -* `make build` -* `./conduit` to run the built binary server +- `make build` +- `./conduit` to run the built binary server After confirming that Conduit server is running locally, you can now run the UI -* `make ui-server ` -* Visit your app at [http://localhost:4200](http://localhost:4200). +- `make ui-server` +- Visit your app at [http://localhost:4200](http://localhost:4200). ### Running Tests -* `make ui-test` +- `make ui-test` ### Linting -* `make ui-lint` -* `make ui-lint-fix` + +- `make ui-lint` +- `make ui-lint-fix` ### Building Conduit UI -Conduit UI is built and embedded into the server's binary using [go embed directives](https://pkg.go.dev/embed). To build the binary with the embedded UI +Conduit UI is built and embedded into the server's binary using [Go embed directives](https://pkg.go.dev/embed). To +build the binary with the embedded UI -* `make build-with-ui` +- `make build-with-ui` -This will build the production UI asset bundle, output it to `pkg/web/ui/dist`, and build the server binary embedded with the bundle. +This will build the production UI asset bundle, output it to `pkg/web/ui/dist`, and build the server binary embedded +with the bundle. ## Further Reading / Useful Links -* [ember.js](https://emberjs.com/) -* [ember-cli](https://ember-cli.com/) -* Development Browser Extensions - * [ember inspector for chrome](https://chrome.google.com/webstore/detail/ember-inspector/bmdblncegkenkacieihfhpjfppoconhi) - * [ember inspector for firefox](https://addons.mozilla.org/en-US/firefox/addon/ember-inspector/) +- [ember.js](https://emberjs.com/) +- [ember-cli](https://ember-cli.com/) +- Development Browser Extensions + - [ember inspector for chrome](https://chrome.google.com/webstore/detail/ember-inspector/bmdblncegkenkacieihfhpjfppoconhi) + - [ember inspector for firefox](https://addons.mozilla.org/en-US/firefox/addon/ember-inspector/)