Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Processor plugin #1308

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Once you have chosen a connector to be built-in, you can:

- Download the new package and its dependencies: `go get "github.com/foo/conduit-connector-new"`
- Import the Go module defining the connector
into the [builtin registry](https://github.com/ConduitIO/conduit/blob/main/pkg/plugin/builtin/registry.go)
into the [builtin registry](https://github.com/ConduitIO/conduit/blob/main/pkg/plugin/connector/builtin/registry.go)
and add a new key to `DefaultDispenserFactories`:

```diff
Expand Down
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
module github.com/conduitio/conduit

go 1.21

toolchain go1.21.1
go 1.21.1

require (
buf.build/gen/go/grpc-ecosystem/grpc-gateway/protocolbuffers/go v1.31.0-20231027202514-3f42134f4c56.2
github.com/Masterminds/semver/v3 v3.2.1
github.com/NYTimes/gziphandler v1.1.1
github.com/antchfx/jsonquery v1.3.3
github.com/bufbuild/buf v1.28.1
github.com/conduitio/conduit-commons v0.0.0-20231205181721-bef91d55116c
github.com/conduitio/conduit-connector-file v0.6.0
github.com/conduitio/conduit-connector-generator v0.5.0
github.com/conduitio/conduit-connector-kafka v0.7.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,8 @@ github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4/go.mod h1:eXthEFrGJvWH
github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I=
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c=
github.com/conduitio/conduit-commons v0.0.0-20231205181721-bef91d55116c h1:iggz0i/MMCHo7FINL01hDUgRhDCLrcSMzIqaiaRCNmg=
github.com/conduitio/conduit-commons v0.0.0-20231205181721-bef91d55116c/go.mod h1:d4Q4/ezToUpMs8d4hk5z3XkcIV0BTrEpEHFecdP0kLs=
github.com/conduitio/conduit-connector-file v0.6.0 h1:8tsGeGhKvFwYQZztOOL5/tmOhVShsfo9lQ3b/0fX8kQ=
github.com/conduitio/conduit-connector-file v0.6.0/go.mod h1:ju7PiB4kTJgqng4KVXDt/Gvw/53kFwSzi5Ez9EDXxNI=
github.com/conduitio/conduit-connector-generator v0.5.0 h1:zpXHif89DCJ13nftKLv31uI2AJGicpY5H1V7SwldRNo=
Expand Down
2 changes: 1 addition & 1 deletion pkg/conduit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/database"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin/builtin"
"github.com/conduitio/conduit/pkg/plugin/connector/builtin"
"github.com/conduitio/conduit/pkg/processor"
"github.com/rs/zerolog"
)
Expand Down
4 changes: 2 additions & 2 deletions pkg/conduit/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ import (
"github.com/conduitio/conduit/pkg/orchestrator"
"github.com/conduitio/conduit/pkg/pipeline"
"github.com/conduitio/conduit/pkg/plugin"
"github.com/conduitio/conduit/pkg/plugin/builtin"
"github.com/conduitio/conduit/pkg/plugin/standalone"
"github.com/conduitio/conduit/pkg/plugin/connector/builtin"
"github.com/conduitio/conduit/pkg/plugin/connector/standalone"
"github.com/conduitio/conduit/pkg/processor"
"github.com/conduitio/conduit/pkg/provisioning"
"github.com/conduitio/conduit/pkg/web/api"
Expand Down
7 changes: 4 additions & 3 deletions pkg/connector/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import (
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin"
connectorPlugin "github.com/conduitio/conduit/pkg/plugin/connector"
"github.com/conduitio/conduit/pkg/record"
)

type Destination struct {
Instance *Instance

dispenser plugin.Dispenser
plugin plugin.DestinationPlugin
dispenser connectorPlugin.Dispenser
plugin connectorPlugin.DestinationPlugin

// errs is used to signal the node that the connector experienced an error
// when it was processing something asynchronously (e.g. persisting state).
Expand All @@ -37,7 +38,7 @@ type Destination struct {
// stopStream is a function that closes the context of the stream
stopStream context.CancelFunc

// wg tracks the number of in flight calls to the plugin.
// wg tracks the number of in flight calls to the connectorPlugin.
wg sync.WaitGroup
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/destination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/conduitio/conduit/pkg/foundation/database/inmemory"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin"
"github.com/conduitio/conduit/pkg/plugin/mock"
"github.com/conduitio/conduit/pkg/plugin/connector/mock"
"github.com/matryer/is"
"go.uber.org/mock/gomock"
)
Expand Down
4 changes: 2 additions & 2 deletions pkg/connector/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/inspector"
"github.com/conduitio/conduit/pkg/plugin"
connectorPlugin "github.com/conduitio/conduit/pkg/plugin/connector"
)

const (
Expand Down Expand Up @@ -86,7 +86,7 @@ type Connector interface {

// PluginDispenserFetcher can fetch a plugin dispenser.
type PluginDispenserFetcher interface {
NewDispenser(logger log.CtxLogger, name string) (plugin.Dispenser, error)
NewDispenser(logger log.CtxLogger, name string) (connectorPlugin.Dispenser, error)
}

func (i *Instance) Init(logger log.CtxLogger, persister *Persister) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/connector/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ package connector
import (
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin"
connectorPlugin "github.com/conduitio/conduit/pkg/plugin/connector"
)

// fakePluginFetcher fulfills the PluginFetcher interface.
type fakePluginFetcher map[string]plugin.Dispenser
type fakePluginFetcher map[string]connectorPlugin.Dispenser

func (fpf fakePluginFetcher) NewDispenser(_ log.CtxLogger, name string) (plugin.Dispenser, error) {
func (fpf fakePluginFetcher) NewDispenser(_ log.CtxLogger, name string) (connectorPlugin.Dispenser, error) {
plug, ok := fpf[name]
if !ok {
return nil, plugin.ErrPluginNotFound
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/conduitio/conduit/pkg/foundation/database/inmemory"
"github.com/conduitio/conduit/pkg/foundation/database/mock"
"github.com/conduitio/conduit/pkg/foundation/log"
pmock "github.com/conduitio/conduit/pkg/plugin/mock"
pmock "github.com/conduitio/conduit/pkg/plugin/connector/mock"
"github.com/conduitio/conduit/pkg/record"
"github.com/google/uuid"
"github.com/matryer/is"
Expand Down
7 changes: 4 additions & 3 deletions pkg/connector/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import (
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin"
connectorPlugin "github.com/conduitio/conduit/pkg/plugin/connector"
"github.com/conduitio/conduit/pkg/record"
)

type Source struct {
Instance *Instance

dispenser plugin.Dispenser
plugin plugin.SourcePlugin
dispenser connectorPlugin.Dispenser
plugin connectorPlugin.SourcePlugin

// errs is used to signal the node that the connector experienced an error
// when it was processing something asynchronously (e.g. persisting state).
Expand All @@ -37,7 +38,7 @@ type Source struct {
// stopStream is a function that closes the context of the stream
stopStream context.CancelFunc

// wg tracks the number of in flight calls to the plugin.
// wg tracks the number of in flight calls to the connectorPlugin.
wg sync.WaitGroup
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/conduitio/conduit/pkg/foundation/database/inmemory"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin"
"github.com/conduitio/conduit/pkg/plugin/mock"
"github.com/conduitio/conduit/pkg/plugin/connector/mock"
"github.com/conduitio/conduit/pkg/record"
"github.com/matryer/is"
"go.uber.org/mock/gomock"
Expand Down
10 changes: 3 additions & 7 deletions pkg/orchestrator/connectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,16 +203,12 @@ func (c *ConnectorOrchestrator) Validate(
plugin string,
config connector.Config,
) error {
d, err := c.plugins.NewDispenser(c.logger, plugin)
if err != nil {
return cerrors.Errorf("couldn't get dispenser: %w", err)
}

var err error
switch t {
case connector.TypeSource:
err = c.plugins.ValidateSourceConfig(ctx, d, config.Settings)
err = c.plugins.ValidateSourceConfig(ctx, plugin, config.Settings)
case connector.TypeDestination:
err = c.plugins.ValidateDestinationConfig(ctx, d, config.Settings)
err = c.plugins.ValidateDestinationConfig(ctx, plugin, config.Settings)
default:
return cerrors.New("invalid connector type")
}
Expand Down
41 changes: 5 additions & 36 deletions pkg/orchestrator/connectors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/conduitio/conduit/pkg/foundation/database/inmemory"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/pipeline"
pmock "github.com/conduitio/conduit/pkg/plugin/mock"
"github.com/google/uuid"
"github.com/matryer/is"
"go.uber.org/mock/gomock"
Expand All @@ -34,7 +33,6 @@ func TestConnectorOrchestrator_Create_Success(t *testing.T) {
is := is.New(t)
ctx := context.Background()
db := &inmemory.DB{}
ctrl := gomock.NewController(t)
plsMock, consMock, procsMock, pluginMock := newMockServices(t)

pl := &pipeline.Instance{
Expand All @@ -57,18 +55,13 @@ func TestConnectorOrchestrator_Create_Success(t *testing.T) {
UpdatedAt: time.Now().UTC(),
}

pluginDispenser := pmock.NewDispenser(ctrl)

plsMock.EXPECT().
Get(gomock.AssignableToTypeOf(ctxType), pl.ID).
Return(pl, nil)
pluginMock.EXPECT().
NewDispenser(gomock.Any(), want.Plugin).
Return(pluginDispenser, nil)
pluginMock.EXPECT().
ValidateSourceConfig(
gomock.AssignableToTypeOf(ctxType),
pluginDispenser,
want.Plugin,
want.Config.Settings,
).Return(nil)
consMock.EXPECT().
Expand Down Expand Up @@ -159,11 +152,8 @@ func TestConnectorOrchestrator_Create_CreateConnectorError(t *testing.T) {
is := is.New(t)
ctx := context.Background()
db := &inmemory.DB{}
ctrl := gomock.NewController(t)
plsMock, consMock, procsMock, pluginMock := newMockServices(t)

pluginDispenser := pmock.NewDispenser(ctrl)

pl := &pipeline.Instance{
ID: uuid.NewString(),
Status: pipeline.StatusSystemStopped,
Expand All @@ -173,13 +163,10 @@ func TestConnectorOrchestrator_Create_CreateConnectorError(t *testing.T) {
plsMock.EXPECT().
Get(gomock.AssignableToTypeOf(ctxType), pl.ID).
Return(pl, nil)
pluginMock.EXPECT().
NewDispenser(gomock.Any(), "test-plugin").
Return(pluginDispenser, nil)
pluginMock.EXPECT().
ValidateSourceConfig(
gomock.AssignableToTypeOf(ctxType),
pluginDispenser,
"test-plugin",
config.Settings,
).Return(nil)
consMock.EXPECT().
Expand All @@ -205,11 +192,8 @@ func TestConnectorOrchestrator_Create_AddConnectorError(t *testing.T) {
is := is.New(t)
ctx := context.Background()
db := &inmemory.DB{}
ctrl := gomock.NewController(t)
plsMock, consMock, procsMock, pluginMock := newMockServices(t)

pluginDispenser := pmock.NewDispenser(ctrl)

pl := &pipeline.Instance{
ID: uuid.NewString(),
Status: pipeline.StatusSystemStopped,
Expand All @@ -234,13 +218,10 @@ func TestConnectorOrchestrator_Create_AddConnectorError(t *testing.T) {
plsMock.EXPECT().
Get(gomock.AssignableToTypeOf(ctxType), pl.ID).
Return(pl, nil)
pluginMock.EXPECT().
NewDispenser(gomock.Any(), conn.Plugin).
Return(pluginDispenser, nil)
pluginMock.EXPECT().
ValidateSourceConfig(
gomock.AssignableToTypeOf(ctxType),
pluginDispenser,
conn.Plugin,
conn.Config.Settings,
).Return(nil)
consMock.EXPECT().
Expand Down Expand Up @@ -458,11 +439,8 @@ func TestConnectorOrchestrator_Update_Success(t *testing.T) {
is := is.New(t)
ctx := context.Background()
db := &inmemory.DB{}
ctrl := gomock.NewController(t)
plsMock, consMock, procsMock, pluginMock := newMockServices(t)

pluginDispenser := pmock.NewDispenser(ctrl)

pl := &pipeline.Instance{
ID: uuid.NewString(),
Status: pipeline.StatusSystemStopped,
Expand Down Expand Up @@ -495,10 +473,7 @@ func TestConnectorOrchestrator_Update_Success(t *testing.T) {
Get(gomock.AssignableToTypeOf(ctxType), pl.ID).
Return(pl, nil)
pluginMock.EXPECT().
NewDispenser(gomock.Any(), conn.Plugin).
Return(pluginDispenser, nil)
pluginMock.EXPECT().
ValidateSourceConfig(gomock.Any(), pluginDispenser, newConfig.Settings).
ValidateSourceConfig(gomock.Any(), conn.Plugin, newConfig.Settings).
Return(nil)
consMock.EXPECT().
Update(gomock.AssignableToTypeOf(ctxType), conn.ID, newConfig).
Expand Down Expand Up @@ -562,11 +537,8 @@ func TestConnectorOrchestrator_Update_Fail(t *testing.T) {
is := is.New(t)
ctx := context.Background()
db := &inmemory.DB{}
ctrl := gomock.NewController(t)
plsMock, consMock, procsMock, pluginMock := newMockServices(t)

pluginDispenser := pmock.NewDispenser(ctrl)

pl := &pipeline.Instance{
ID: uuid.NewString(),
Status: pipeline.StatusSystemStopped,
Expand All @@ -585,10 +557,7 @@ func TestConnectorOrchestrator_Update_Fail(t *testing.T) {
Get(gomock.AssignableToTypeOf(ctxType), pl.ID).
Return(pl, nil)
pluginMock.EXPECT().
NewDispenser(gomock.Any(), conn.Plugin).
Return(pluginDispenser, nil)
pluginMock.EXPECT().
ValidateDestinationConfig(gomock.Any(), pluginDispenser, conn.Config.Settings).
ValidateDestinationConfig(gomock.Any(), conn.Plugin, conn.Config.Settings).
Return(nil)
consMock.EXPECT().
Update(gomock.AssignableToTypeOf(ctxType), conn.ID, connector.Config{}).
Expand Down
24 changes: 12 additions & 12 deletions pkg/orchestrator/mock/orchestrator.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading