diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 0b8aac9..5cb59f7 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -10,7 +10,10 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 + - uses: actions/setup-go@v3 + with: + go-version: 1.18 - name: golangci-lint uses: golangci/golangci-lint-action@v3.2.0 with: - version: v1.46.2 + version: v1.47.3 diff --git a/cmd/generator/main.go b/cmd/generator/main.go index 391a808..d9e9720 100644 --- a/cmd/generator/main.go +++ b/cmd/generator/main.go @@ -20,9 +20,5 @@ import ( ) func main() { - sdk.Serve( - generator.Specification, - generator.NewSource, - nil, - ) + sdk.Serve(generator.Connector) } diff --git a/connector.go b/connector.go new file mode 100644 index 0000000..04de95e --- /dev/null +++ b/connector.go @@ -0,0 +1,23 @@ +// Copyright © 2022 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package generator + +import sdk "github.com/conduitio/conduit-connector-sdk" + +var Connector = sdk.Connector{ + NewSpecification: Specification, + NewSource: NewSource, + NewDestination: nil, +} diff --git a/go.mod b/go.mod index d1a304c..05c603d 100644 --- a/go.mod +++ b/go.mod @@ -3,18 +3,17 @@ module github.com/conduitio/conduit-connector-generator go 1.18 require ( - github.com/conduitio/conduit-connector-sdk v0.2.0 + github.com/conduitio/conduit-connector-sdk v0.2.1-0.20220803121801-c861a0fb207c github.com/google/uuid v1.3.0 github.com/matryer/is v1.4.0 ) require ( - github.com/conduitio/conduit-connector-protocol v0.2.0 // indirect + github.com/conduitio/conduit-connector-protocol v0.2.1-0.20220802135043-4b89a6c94401 // indirect github.com/fatih/color v1.13.0 // indirect github.com/golang/mock v1.6.0 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/go-cmp v0.5.7 // indirect - github.com/google/uuid v1.3.0 // indirect github.com/hashicorp/go-hclog v1.2.0 // indirect github.com/hashicorp/go-plugin v1.4.3 // indirect github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 // indirect @@ -24,8 +23,10 @@ require ( github.com/mattn/go-isatty v0.0.14 // indirect github.com/mitchellh/go-testing-interface v1.14.1 // indirect github.com/oklog/run v1.1.0 // indirect - github.com/rs/zerolog v1.26.1 // indirect + github.com/rs/zerolog v1.27.0 // indirect + go.buf.build/grpc/go/conduitio/conduit-connector-protocol v1.3.3 // indirect go.buf.build/library/go-grpc/conduitio/conduit-connector-protocol v1.4.1 // indirect + go.uber.org/goleak v1.1.12 // indirect golang.org/x/net v0.0.0-20220403103023-749bd193bc2b // indirect golang.org/x/sys v0.0.0-20220405052023-b1e9470b6e64 // indirect golang.org/x/text v0.3.7 // indirect diff --git a/go.sum b/go.sum index eec18c6..18dbdd8 100644 --- a/go.sum +++ b/go.sum @@ -16,9 +16,14 @@ github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/conduitio/conduit-connector-protocol v0.2.0 h1:gwYXVKEMgTtU67ephQ5WwTGIDbT/eTLA9Mdr9Bnbqxc= github.com/conduitio/conduit-connector-protocol v0.2.0/go.mod h1:udCU2AkLcYQoLjAO06tHVL2iFJPw+DamK+wllnj50hk= +github.com/conduitio/conduit-connector-protocol v0.2.1-0.20220802135043-4b89a6c94401 h1:YXw/DQ8j1RjyqLxoWE5MV1s6V6soWolxnzUpIDg4bEY= +github.com/conduitio/conduit-connector-protocol v0.2.1-0.20220802135043-4b89a6c94401/go.mod h1:jynMd6Kuc7xUABrvYTUrOBuTYAtoQsZ7T6tAB9xAWOo= github.com/conduitio/conduit-connector-sdk v0.2.0 h1:yReJT3SOAGqJIlk59WC5FPgpv0Gg+NG4NFj6FJ89XnM= github.com/conduitio/conduit-connector-sdk v0.2.0/go.mod h1:zZ/YJqhIZyXdVmFJS55zqkukpBmB+ohbX2kDduoj8Z0= +github.com/conduitio/conduit-connector-sdk v0.2.1-0.20220803121801-c861a0fb207c h1:hINPPA0bEJ/MkfQcM/HdVLcD+lqD8bpq3mLPeGsi/1s= +github.com/conduitio/conduit-connector-sdk v0.2.1-0.20220803121801-c861a0fb207c/go.mod h1:+68a3+2KGaPTvTL+4UFcE2H0Gk29fg9LJb3fVcjdoRU= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -116,6 +121,8 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.26.1 h1:/ihwxqH+4z8UxyI70wM1z9yCvkWcfz/a3mj48k/Zngc= github.com/rs/zerolog v1.26.1/go.mod h1:/wSSJWX7lVrsOwlbyTRSOJvqRlc+WjWlfes+CiJ+tmc= +github.com/rs/zerolog v1.27.0 h1:1T7qCieN22GVc8S4Q2yuexzBb1EqjbgjSH9RohbMjKs= +github.com/rs/zerolog v1.27.0/go.mod h1:7frBqO0oezxmnO7GF86FY++uy8I0Tk/If5ni1G9Qc0U= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= @@ -127,9 +134,13 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +go.buf.build/grpc/go/conduitio/conduit-connector-protocol v1.3.3 h1:Nlr9Dm1z7OoG4S1ic1yuwVU8oreR2eFofiJt5v8+zok= +go.buf.build/grpc/go/conduitio/conduit-connector-protocol v1.3.3/go.mod h1:0IIr2GMOsGd1hPPfwQMmXMXnME6jZSER4C2JS/+BrLc= go.buf.build/library/go-grpc/conduitio/conduit-connector-protocol v1.4.1 h1:EHYFlC8XppCJX8C3TS06BC3xA6ctiowDlySWErdOaXU= go.buf.build/library/go-grpc/conduitio/conduit-connector-protocol v1.4.1/go.mod h1:iYPhlwHzhRoPYviJbA604qT6wYuQghfrebmXUXLKjk8= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= +go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= +go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -139,6 +150,7 @@ golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -207,6 +219,7 @@ golang.org/x/tools v0.0.0-20191130070609-6e064ea0cf2d/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20200522201501-cb1345f3a375/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200717024301-6ddee64345a6/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -229,6 +242,7 @@ google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyac google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.32.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE= diff --git a/record_generator.go b/record_generator.go index eca3eec..39060bb 100644 --- a/record_generator.go +++ b/record_generator.go @@ -57,13 +57,12 @@ func (g *recordGenerator) generate() (sdk.Record, error) { if err != nil { return sdk.Record{}, err } - return sdk.Record{ - Position: []byte(uuid.New().String()), - Metadata: make(map[string]string), - Key: sdk.RawData(uuid.NewString()), - Payload: p, - CreatedAt: time.Now(), - }, nil + return sdk.Util.Source.NewRecordCreate( + []byte(uuid.New().String()), + nil, + sdk.RawData(uuid.NewString()), + p, + ), nil } func (g *recordGenerator) generatePayload(config RecordConfig) (sdk.Data, error) { diff --git a/source.go b/source.go index 7b39556..5e8f0c2 100644 --- a/source.go +++ b/source.go @@ -51,6 +51,11 @@ func (s *Source) Open(_ context.Context, _ sdk.Position) error { } func (s *Source) Read(ctx context.Context) (sdk.Record, error) { + if ctx.Err() != nil { + // stop producing new records if context is canceled + return sdk.Record{}, ctx.Err() + } + if s.created >= s.config.RecordCount && s.config.RecordCount >= 0 { // nothing more to produce, block until context is done <-ctx.Done() diff --git a/source_test.go b/source_test.go index 260aca0..bdec124 100644 --- a/source_test.go +++ b/source_test.go @@ -39,7 +39,7 @@ func TestRead_RawData(t *testing.T) { rec, err := underTest.Read(context.Background()) is.NoErr(err) - v, ok := rec.Payload.(sdk.RawData) + v, ok := rec.Payload.After.(sdk.RawData) is.True(ok) recStruct := struct { @@ -66,7 +66,7 @@ func TestRead_PayloadFile(t *testing.T) { rec, err := underTest.Read(context.Background()) is.NoErr(err) - v, ok := rec.Payload.(sdk.RawData) + v, ok := rec.Payload.After.(sdk.RawData) is.True(ok) expected, err := os.ReadFile("./source_test.go") @@ -88,7 +88,7 @@ func TestRead_StructuredData(t *testing.T) { rec, err := underTest.Read(context.Background()) is.NoErr(err) - v, ok := rec.Payload.(sdk.StructuredData) + v, ok := rec.Payload.After.(sdk.StructuredData) is.True(ok) recStruct := struct {