Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to latest SDK #54

Merged
merged 2 commits into from
Aug 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 37 additions & 12 deletions destination/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,34 @@ type config struct {
}

func NewDestination() sdk.Destination {
return &Destination{
d := &Destination{
stmtBuilder: sq.StatementBuilder.PlaceholderFormat(sq.Dollar),
}
return sdk.DestinationWithMiddleware(d, sdk.DefaultDestinationMiddleware()...)
}

func (d *Destination) Parameters() map[string]sdk.Parameter {
return map[string]sdk.Parameter{
ConfigURL: {
Default: "",
Required: true,
Description: "Connection string for the Postgres database.",
},
ConfigTable: {
Default: "",
Required: false,
Description: "The name of the table in Postgres that the connector should write to.",
},
ConfigKey: {
Default: "",
Required: false,
Description: "Column name used to detect if the target table already contains the record.",
},
}
}

func (d *Destination) Configure(ctx context.Context, cfg map[string]string) error {
// TODO validate required fields
d.config = config{
url: cfg[ConfigURL],
tableName: cfg[ConfigTable],
Expand All @@ -76,17 +98,20 @@ func (d *Destination) Open(ctx context.Context) error {

// Write routes incoming records to their appropriate handler based on the
// operation.
func (d *Destination) Write(ctx context.Context, r sdk.Record) error {
return sdk.Util.Destination.Route(ctx, r,
d.handleInsert,
d.handleUpdate,
d.handleDelete,
d.handleInsert,
)
}

func (d *Destination) Flush(context.Context) error {
return nil
func (d *Destination) Write(ctx context.Context, recs []sdk.Record) (int, error) {
for i, r := range recs {
// TODO send all queries to postgres in one round-trip
err := sdk.Util.Destination.Route(ctx, r,
d.handleInsert,
d.handleUpdate,
d.handleDelete,
d.handleInsert,
)
if err != nil {
return i, err
}
}
return len(recs), nil
}

func (d *Destination) Teardown(ctx context.Context) error {
Expand Down
3 changes: 2 additions & 1 deletion destination/destination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ func TestDestination_Write(t *testing.T) {
is = is.New(t)
id := tt.record.Key.(sdk.StructuredData)["id"]

err = d.Write(ctx, tt.record)
i, err := d.Write(ctx, []sdk.Record{tt.record})
is.NoErr(err)
is.Equal(i, 1)

got, err := queryTestTable(ctx, conn, tableName, id)
switch tt.record.Operation {
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.18

require (
github.com/Masterminds/squirrel v1.5.3
github.com/conduitio/conduit-connector-sdk v0.2.1-0.20220803121801-c861a0fb207c
github.com/conduitio/conduit-connector-sdk v0.2.1-0.20220823112121-47067cda967a
github.com/jackc/pgconn v1.13.0
github.com/jackc/pglogrepl v0.0.0-20220305000529-420b8467887a
github.com/jackc/pgproto3/v2 v2.3.1
Expand Down Expand Up @@ -43,6 +43,7 @@ require (
golang.org/x/net v0.0.0-20220403103023-749bd193bc2b // indirect
golang.org/x/sys v0.0.0-20220405052023-b1e9470b6e64 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/genproto v0.0.0-20220401170504-314d38edb7de // indirect
google.golang.org/grpc v1.45.0 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/conduitio/conduit-connector-protocol v0.2.1-0.20220802135043-4b89a6c94401 h1:YXw/DQ8j1RjyqLxoWE5MV1s6V6soWolxnzUpIDg4bEY=
github.com/conduitio/conduit-connector-protocol v0.2.1-0.20220802135043-4b89a6c94401/go.mod h1:jynMd6Kuc7xUABrvYTUrOBuTYAtoQsZ7T6tAB9xAWOo=
github.com/conduitio/conduit-connector-sdk v0.2.1-0.20220803121801-c861a0fb207c h1:hINPPA0bEJ/MkfQcM/HdVLcD+lqD8bpq3mLPeGsi/1s=
github.com/conduitio/conduit-connector-sdk v0.2.1-0.20220803121801-c861a0fb207c/go.mod h1:+68a3+2KGaPTvTL+4UFcE2H0Gk29fg9LJb3fVcjdoRU=
github.com/conduitio/conduit-connector-sdk v0.2.1-0.20220823112121-47067cda967a h1:VqjfPhBIBb4cTT5ylyeZ2D8YdyOiG2aNToD2ZH/hYIo=
github.com/conduitio/conduit-connector-sdk v0.2.1-0.20220823112121-47067cda967a/go.mod h1:9+76jHp4YiO+1iu5Iaudyh830K73icqGOE6w7bqZaAs=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
Expand Down Expand Up @@ -311,6 +311,8 @@ golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 h1:ftMN5LMiBFjbzleLqtoBZk7KdJwhuybIU+FckUHgoyQ=
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
Expand Down
47 changes: 46 additions & 1 deletion source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,52 @@ type Source struct {
}

func NewSource() sdk.Source {
return &Source{}
return sdk.SourceWithMiddleware(&Source{}, sdk.DefaultSourceMiddleware()...)
}

func (s *Source) Parameters() map[string]sdk.Parameter {
return map[string]sdk.Parameter{
ConfigKeyURL: {
Default: "",
Required: true,
Description: "Connection string for the Postgres database.",
},
ConfigKeyTable: {
Default: "",
Required: true,
Description: "The name of the table in Postgres that the connector should read.",
},
ConfigKeyColumns: {
Default: "",
Required: false,
Description: "Comma separated list of column names that should be included in each Record's payload.",
},
ConfigKeyKey: {
Default: "",
Required: false,
Description: "Column name that records should use for their `Key` fields.",
},
ConfigKeySnapshotMode: {
Default: "initial",
Required: false,
Description: "Whether or not the plugin will take a snapshot of the entire table before starting cdc mode (allowed values: `initial` or `never`).",
},
ConfigKeyCDCMode: {
Default: "auto",
Required: false,
Description: "Determines the CDC mode (allowed values: `auto`, `logrepl` or `long_polling`).",
},
ConfigKeyLogreplPublicationName: {
Default: "conduitpub",
Required: false,
Description: "Name of the publication to listen for WAL events.",
},
ConfigKeyLogreplSlotName: {
Default: "conduitslot",
Required: false,
Description: "Name of the slot opened for replication events.",
},
}
}

func (s *Source) Configure(ctx context.Context, cfgRaw map[string]string) error {
Expand Down
59 changes: 0 additions & 59 deletions spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,64 +24,5 @@ func Specification() sdk.Specification {
Summary: "A PostgreSQL source and destination plugin for Conduit.",
Version: "v0.1.0",
Author: "Meroxa, Inc.",
DestinationParams: map[string]sdk.Parameter{
"url": {
Default: "",
Required: true,
Description: "Connection string for the Postgres database.",
},
"table": {
Default: "",
Required: false,
Description: "The name of the table in Postgres that the connector should write to.",
},
"key": {
Default: "",
Required: false,
Description: "Column name used to detect if the target table already contains the record.",
},
},
SourceParams: map[string]sdk.Parameter{
"url": {
Default: "",
Required: true,
Description: "Connection string for the Postgres database.",
},
"table": {
Default: "",
Required: true,
Description: "The name of the table in Postgres that the connector should read.",
},
"columns": {
Default: "",
Required: false,
Description: "Comma separated list of column names that should be included in each Record's payload.",
},
"key": {
Default: "",
Required: false,
Description: "Column name that records should use for their `Key` fields.",
},
"snapshotMode": {
Default: "initial",
Required: false,
Description: "Whether or not the plugin will take a snapshot of the entire table before starting cdc mode (allowed values: `initial` or `never`).",
},
"cdcMode": {
Default: "auto",
Required: false,
Description: "Determines the CDC mode (allowed values: `auto`, `logrepl` or `long_polling`).",
},
"logrepl.publicationName": {
Default: "conduitpub",
Required: false,
Description: "Name of the publication to listen for WAL events.",
},
"logrepl.slotName": {
Default: "conduitslot",
Required: false,
Description: "Name of the slot opened for replication events.",
},
},
}
}