Skip to content

Commit

Permalink
OpenCDC integration (#21)
Browse files Browse the repository at this point in the history
* integrate OpenCDC record

* update SDK utilities

* fix graceful shutdown of generator

* update connector sdk version

* update golangci-lint version
  • Loading branch information
lovromazgon authored Aug 5, 2022
1 parent a5b270f commit 38ed365
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 20 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v3
with:
go-version: 1.18
- name: golangci-lint
uses: golangci/golangci-lint-action@v3.2.0
with:
version: v1.46.2
version: v1.47.3
6 changes: 1 addition & 5 deletions cmd/generator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,5 @@ import (
)

func main() {
sdk.Serve(
generator.Specification,
generator.NewSource,
nil,
)
sdk.Serve(generator.Connector)
}
23 changes: 23 additions & 0 deletions connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright © 2022 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package generator

import sdk "github.com/conduitio/conduit-connector-sdk"

var Connector = sdk.Connector{
NewSpecification: Specification,
NewSource: NewSource,
NewDestination: nil,
}
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,17 @@ module github.com/conduitio/conduit-connector-generator
go 1.18

require (
github.com/conduitio/conduit-connector-sdk v0.2.0
github.com/conduitio/conduit-connector-sdk v0.2.1-0.20220803121801-c861a0fb207c
github.com/google/uuid v1.3.0
github.com/matryer/is v1.4.0
)

require (
github.com/conduitio/conduit-connector-protocol v0.2.0 // indirect
github.com/conduitio/conduit-connector-protocol v0.2.1-0.20220802135043-4b89a6c94401 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.7 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hashicorp/go-hclog v1.2.0 // indirect
github.com/hashicorp/go-plugin v1.4.3 // indirect
github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 // indirect
Expand All @@ -24,8 +23,10 @@ require (
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/mitchellh/go-testing-interface v1.14.1 // indirect
github.com/oklog/run v1.1.0 // indirect
github.com/rs/zerolog v1.26.1 // indirect
github.com/rs/zerolog v1.27.0 // indirect
go.buf.build/grpc/go/conduitio/conduit-connector-protocol v1.3.3 // indirect
go.buf.build/library/go-grpc/conduitio/conduit-connector-protocol v1.4.1 // indirect
go.uber.org/goleak v1.1.12 // indirect
golang.org/x/net v0.0.0-20220403103023-749bd193bc2b // indirect
golang.org/x/sys v0.0.0-20220405052023-b1e9470b6e64 // indirect
golang.org/x/text v0.3.7 // indirect
Expand Down
14 changes: 14 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@ github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWH
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/conduitio/conduit-connector-protocol v0.2.0 h1:gwYXVKEMgTtU67ephQ5WwTGIDbT/eTLA9Mdr9Bnbqxc=
github.com/conduitio/conduit-connector-protocol v0.2.0/go.mod h1:udCU2AkLcYQoLjAO06tHVL2iFJPw+DamK+wllnj50hk=
github.com/conduitio/conduit-connector-protocol v0.2.1-0.20220802135043-4b89a6c94401 h1:YXw/DQ8j1RjyqLxoWE5MV1s6V6soWolxnzUpIDg4bEY=
github.com/conduitio/conduit-connector-protocol v0.2.1-0.20220802135043-4b89a6c94401/go.mod h1:jynMd6Kuc7xUABrvYTUrOBuTYAtoQsZ7T6tAB9xAWOo=
github.com/conduitio/conduit-connector-sdk v0.2.0 h1:yReJT3SOAGqJIlk59WC5FPgpv0Gg+NG4NFj6FJ89XnM=
github.com/conduitio/conduit-connector-sdk v0.2.0/go.mod h1:zZ/YJqhIZyXdVmFJS55zqkukpBmB+ohbX2kDduoj8Z0=
github.com/conduitio/conduit-connector-sdk v0.2.1-0.20220803121801-c861a0fb207c h1:hINPPA0bEJ/MkfQcM/HdVLcD+lqD8bpq3mLPeGsi/1s=
github.com/conduitio/conduit-connector-sdk v0.2.1-0.20220803121801-c861a0fb207c/go.mod h1:+68a3+2KGaPTvTL+4UFcE2H0Gk29fg9LJb3fVcjdoRU=
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -116,6 +121,8 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.26.1 h1:/ihwxqH+4z8UxyI70wM1z9yCvkWcfz/a3mj48k/Zngc=
github.com/rs/zerolog v1.26.1/go.mod h1:/wSSJWX7lVrsOwlbyTRSOJvqRlc+WjWlfes+CiJ+tmc=
github.com/rs/zerolog v1.27.0 h1:1T7qCieN22GVc8S4Q2yuexzBb1EqjbgjSH9RohbMjKs=
github.com/rs/zerolog v1.27.0/go.mod h1:7frBqO0oezxmnO7GF86FY++uy8I0Tk/If5ni1G9Qc0U=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
Expand All @@ -127,9 +134,13 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.buf.build/grpc/go/conduitio/conduit-connector-protocol v1.3.3 h1:Nlr9Dm1z7OoG4S1ic1yuwVU8oreR2eFofiJt5v8+zok=
go.buf.build/grpc/go/conduitio/conduit-connector-protocol v1.3.3/go.mod h1:0IIr2GMOsGd1hPPfwQMmXMXnME6jZSER4C2JS/+BrLc=
go.buf.build/library/go-grpc/conduitio/conduit-connector-protocol v1.4.1 h1:EHYFlC8XppCJX8C3TS06BC3xA6ctiowDlySWErdOaXU=
go.buf.build/library/go-grpc/conduitio/conduit-connector-protocol v1.4.1/go.mod h1:iYPhlwHzhRoPYviJbA604qT6wYuQghfrebmXUXLKjk8=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
Expand All @@ -139,6 +150,7 @@ golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
Expand Down Expand Up @@ -207,6 +219,7 @@ golang.org/x/tools v0.0.0-20191130070609-6e064ea0cf2d/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20200522201501-cb1345f3a375/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200717024301-6ddee64345a6/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand All @@ -229,6 +242,7 @@ google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyac
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.32.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE=
Expand Down
13 changes: 6 additions & 7 deletions record_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,12 @@ func (g *recordGenerator) generate() (sdk.Record, error) {
if err != nil {
return sdk.Record{}, err
}
return sdk.Record{
Position: []byte(uuid.New().String()),
Metadata: make(map[string]string),
Key: sdk.RawData(uuid.NewString()),
Payload: p,
CreatedAt: time.Now(),
}, nil
return sdk.Util.Source.NewRecordCreate(
[]byte(uuid.New().String()),
nil,
sdk.RawData(uuid.NewString()),
p,
), nil
}

func (g *recordGenerator) generatePayload(config RecordConfig) (sdk.Data, error) {
Expand Down
5 changes: 5 additions & 0 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ func (s *Source) Open(_ context.Context, _ sdk.Position) error {
}

func (s *Source) Read(ctx context.Context) (sdk.Record, error) {
if ctx.Err() != nil {
// stop producing new records if context is canceled
return sdk.Record{}, ctx.Err()
}

if s.created >= s.config.RecordCount && s.config.RecordCount >= 0 {
// nothing more to produce, block until context is done
<-ctx.Done()
Expand Down
6 changes: 3 additions & 3 deletions source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestRead_RawData(t *testing.T) {
rec, err := underTest.Read(context.Background())
is.NoErr(err)

v, ok := rec.Payload.(sdk.RawData)
v, ok := rec.Payload.After.(sdk.RawData)
is.True(ok)

recStruct := struct {
Expand All @@ -66,7 +66,7 @@ func TestRead_PayloadFile(t *testing.T) {
rec, err := underTest.Read(context.Background())
is.NoErr(err)

v, ok := rec.Payload.(sdk.RawData)
v, ok := rec.Payload.After.(sdk.RawData)
is.True(ok)

expected, err := os.ReadFile("./source_test.go")
Expand All @@ -88,7 +88,7 @@ func TestRead_StructuredData(t *testing.T) {
rec, err := underTest.Read(context.Background())
is.NoErr(err)

v, ok := rec.Payload.(sdk.StructuredData)
v, ok := rec.Payload.After.(sdk.StructuredData)
is.True(ok)

recStruct := struct {
Expand Down

0 comments on commit 38ed365

Please sign in to comment.