diff --git a/destination/destination.go b/destination/destination.go index 309376e..37534f9 100644 --- a/destination/destination.go +++ b/destination/destination.go @@ -51,12 +51,34 @@ type config struct { } func NewDestination() sdk.Destination { - return &Destination{ + d := &Destination{ stmtBuilder: sq.StatementBuilder.PlaceholderFormat(sq.Dollar), } + return sdk.DestinationWithMiddleware(d, sdk.DefaultDestinationMiddleware()...) +} + +func (d *Destination) Parameters() map[string]sdk.Parameter { + return map[string]sdk.Parameter{ + ConfigURL: { + Default: "", + Required: true, + Description: "Connection string for the Postgres database.", + }, + ConfigTable: { + Default: "", + Required: false, + Description: "The name of the table in Postgres that the connector should write to.", + }, + ConfigKey: { + Default: "", + Required: false, + Description: "Column name used to detect if the target table already contains the record.", + }, + } } func (d *Destination) Configure(ctx context.Context, cfg map[string]string) error { + // TODO validate required fields d.config = config{ url: cfg[ConfigURL], tableName: cfg[ConfigTable], @@ -76,17 +98,20 @@ func (d *Destination) Open(ctx context.Context) error { // Write routes incoming records to their appropriate handler based on the // operation. -func (d *Destination) Write(ctx context.Context, r sdk.Record) error { - return sdk.Util.Destination.Route(ctx, r, - d.handleInsert, - d.handleUpdate, - d.handleDelete, - d.handleInsert, - ) -} - -func (d *Destination) Flush(context.Context) error { - return nil +func (d *Destination) Write(ctx context.Context, recs []sdk.Record) (int, error) { + for i, r := range recs { + // TODO send all queries to postgres in one round-trip + err := sdk.Util.Destination.Route(ctx, r, + d.handleInsert, + d.handleUpdate, + d.handleDelete, + d.handleInsert, + ) + if err != nil { + return i, err + } + } + return len(recs), nil } func (d *Destination) Teardown(ctx context.Context) error { diff --git a/destination/destination_test.go b/destination/destination_test.go index ab93fa0..ac532a1 100644 --- a/destination/destination_test.go +++ b/destination/destination_test.go @@ -119,8 +119,9 @@ func TestDestination_Write(t *testing.T) { is = is.New(t) id := tt.record.Key.(sdk.StructuredData)["id"] - err = d.Write(ctx, tt.record) + i, err := d.Write(ctx, []sdk.Record{tt.record}) is.NoErr(err) + is.Equal(i, 1) got, err := queryTestTable(ctx, conn, tableName, id) switch tt.record.Operation { diff --git a/go.mod b/go.mod index c2d826c..3dba053 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.18 require ( github.com/Masterminds/squirrel v1.5.3 - github.com/conduitio/conduit-connector-sdk v0.2.1-0.20220803121801-c861a0fb207c + github.com/conduitio/conduit-connector-sdk v0.2.1-0.20220823112121-47067cda967a github.com/jackc/pgconn v1.13.0 github.com/jackc/pglogrepl v0.0.0-20220305000529-420b8467887a github.com/jackc/pgproto3/v2 v2.3.1 @@ -43,6 +43,7 @@ require ( golang.org/x/net v0.0.0-20220403103023-749bd193bc2b // indirect golang.org/x/sys v0.0.0-20220405052023-b1e9470b6e64 // indirect golang.org/x/text v0.3.7 // indirect + golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect google.golang.org/genproto v0.0.0-20220401170504-314d38edb7de // indirect google.golang.org/grpc v1.45.0 // indirect diff --git a/go.sum b/go.sum index 4386e0a..d0f05ee 100644 --- a/go.sum +++ b/go.sum @@ -18,8 +18,8 @@ github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= github.com/conduitio/conduit-connector-protocol v0.2.1-0.20220802135043-4b89a6c94401 h1:YXw/DQ8j1RjyqLxoWE5MV1s6V6soWolxnzUpIDg4bEY= github.com/conduitio/conduit-connector-protocol v0.2.1-0.20220802135043-4b89a6c94401/go.mod h1:jynMd6Kuc7xUABrvYTUrOBuTYAtoQsZ7T6tAB9xAWOo= -github.com/conduitio/conduit-connector-sdk v0.2.1-0.20220803121801-c861a0fb207c h1:hINPPA0bEJ/MkfQcM/HdVLcD+lqD8bpq3mLPeGsi/1s= -github.com/conduitio/conduit-connector-sdk v0.2.1-0.20220803121801-c861a0fb207c/go.mod h1:+68a3+2KGaPTvTL+4UFcE2H0Gk29fg9LJb3fVcjdoRU= +github.com/conduitio/conduit-connector-sdk v0.2.1-0.20220823112121-47067cda967a h1:VqjfPhBIBb4cTT5ylyeZ2D8YdyOiG2aNToD2ZH/hYIo= +github.com/conduitio/conduit-connector-sdk v0.2.1-0.20220823112121-47067cda967a/go.mod h1:9+76jHp4YiO+1iu5Iaudyh830K73icqGOE6w7bqZaAs= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= @@ -311,6 +311,8 @@ golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 h1:ftMN5LMiBFjbzleLqtoBZk7KdJwhuybIU+FckUHgoyQ= +golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/source/source.go b/source/source.go index a5151e8..8f664bc 100644 --- a/source/source.go +++ b/source/source.go @@ -39,7 +39,52 @@ type Source struct { } func NewSource() sdk.Source { - return &Source{} + return sdk.SourceWithMiddleware(&Source{}, sdk.DefaultSourceMiddleware()...) +} + +func (s *Source) Parameters() map[string]sdk.Parameter { + return map[string]sdk.Parameter{ + ConfigKeyURL: { + Default: "", + Required: true, + Description: "Connection string for the Postgres database.", + }, + ConfigKeyTable: { + Default: "", + Required: true, + Description: "The name of the table in Postgres that the connector should read.", + }, + ConfigKeyColumns: { + Default: "", + Required: false, + Description: "Comma separated list of column names that should be included in each Record's payload.", + }, + ConfigKeyKey: { + Default: "", + Required: false, + Description: "Column name that records should use for their `Key` fields.", + }, + ConfigKeySnapshotMode: { + Default: "initial", + Required: false, + Description: "Whether or not the plugin will take a snapshot of the entire table before starting cdc mode (allowed values: `initial` or `never`).", + }, + ConfigKeyCDCMode: { + Default: "auto", + Required: false, + Description: "Determines the CDC mode (allowed values: `auto`, `logrepl` or `long_polling`).", + }, + ConfigKeyLogreplPublicationName: { + Default: "conduitpub", + Required: false, + Description: "Name of the publication to listen for WAL events.", + }, + ConfigKeyLogreplSlotName: { + Default: "conduitslot", + Required: false, + Description: "Name of the slot opened for replication events.", + }, + } } func (s *Source) Configure(ctx context.Context, cfgRaw map[string]string) error { diff --git a/spec.go b/spec.go index e61c03a..51d53ee 100644 --- a/spec.go +++ b/spec.go @@ -24,64 +24,5 @@ func Specification() sdk.Specification { Summary: "A PostgreSQL source and destination plugin for Conduit.", Version: "v0.1.0", Author: "Meroxa, Inc.", - DestinationParams: map[string]sdk.Parameter{ - "url": { - Default: "", - Required: true, - Description: "Connection string for the Postgres database.", - }, - "table": { - Default: "", - Required: false, - Description: "The name of the table in Postgres that the connector should write to.", - }, - "key": { - Default: "", - Required: false, - Description: "Column name used to detect if the target table already contains the record.", - }, - }, - SourceParams: map[string]sdk.Parameter{ - "url": { - Default: "", - Required: true, - Description: "Connection string for the Postgres database.", - }, - "table": { - Default: "", - Required: true, - Description: "The name of the table in Postgres that the connector should read.", - }, - "columns": { - Default: "", - Required: false, - Description: "Comma separated list of column names that should be included in each Record's payload.", - }, - "key": { - Default: "", - Required: false, - Description: "Column name that records should use for their `Key` fields.", - }, - "snapshotMode": { - Default: "initial", - Required: false, - Description: "Whether or not the plugin will take a snapshot of the entire table before starting cdc mode (allowed values: `initial` or `never`).", - }, - "cdcMode": { - Default: "auto", - Required: false, - Description: "Determines the CDC mode (allowed values: `auto`, `logrepl` or `long_polling`).", - }, - "logrepl.publicationName": { - Default: "conduitpub", - Required: false, - Description: "Name of the publication to listen for WAL events.", - }, - "logrepl.slotName": { - Default: "conduitslot", - Required: false, - Description: "Name of the slot opened for replication events.", - }, - }, } }