From f75cf2fb5787e2eb6fe3c08fea3e5a946940f07b Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Thu, 12 Sep 2024 10:08:38 +0200 Subject: [PATCH 01/14] payload schema --- source.go | 1 - source/logrepl/cdc_test.go | 20 ++++++++--- source/logrepl/combined.go | 13 +++---- source/logrepl/handler.go | 62 +++++++++++++++++++++++---------- source/schema/avro.go | 6 ++-- source/schema/schema.go | 17 --------- source/snapshot/fetch_worker.go | 17 +++++---- source/snapshot/iterator.go | 55 +++++++++++++++++++---------- test/docker-compose.yml | 1 - 9 files changed, 111 insertions(+), 81 deletions(-) delete mode 100644 source/schema/schema.go diff --git a/source.go b/source.go index 6729856..6561f61 100644 --- a/source.go +++ b/source.go @@ -113,7 +113,6 @@ func (s *Source) Open(ctx context.Context, pos opencdc.Position) error { TableKeys: s.tableKeys, WithSnapshot: s.config.SnapshotMode == source.SnapshotModeInitial, SnapshotFetchSize: s.config.SnapshotFetchSize, - WithAvroSchema: s.config.WithAvroSchema, }) if err != nil { return fmt.Errorf("failed to create logical replication iterator: %w", err) diff --git a/source/logrepl/cdc_test.go b/source/logrepl/cdc_test.go index 1e98824..8100dc7 100644 --- a/source/logrepl/cdc_test.go +++ b/source/logrepl/cdc_test.go @@ -146,7 +146,9 @@ func TestCDCIterator_Next(t *testing.T) { want: opencdc.Record{ Operation: opencdc.OperationCreate, Metadata: map[string]string{ - opencdc.MetadataCollection: table, + opencdc.MetadataCollection: table, + opencdc.MetadataPayloadSchemaSubject: table, + opencdc.MetadataPayloadSchemaVersion: "1", }, Key: opencdc.StructuredData{"id": int64(6)}, Payload: opencdc.Change{ @@ -175,7 +177,9 @@ func TestCDCIterator_Next(t *testing.T) { want: opencdc.Record{ Operation: opencdc.OperationUpdate, Metadata: map[string]string{ - opencdc.MetadataCollection: table, + opencdc.MetadataCollection: table, + opencdc.MetadataPayloadSchemaSubject: table, + opencdc.MetadataPayloadSchemaVersion: "1", }, Key: opencdc.StructuredData{"id": int64(1)}, Payload: opencdc.Change{ @@ -205,7 +209,9 @@ func TestCDCIterator_Next(t *testing.T) { want: opencdc.Record{ Operation: opencdc.OperationUpdate, Metadata: map[string]string{ - opencdc.MetadataCollection: table, + opencdc.MetadataCollection: table, + opencdc.MetadataPayloadSchemaSubject: table, + opencdc.MetadataPayloadSchemaVersion: "1", }, Key: opencdc.StructuredData{"id": int64(1)}, Payload: opencdc.Change{ @@ -244,7 +250,9 @@ func TestCDCIterator_Next(t *testing.T) { want: opencdc.Record{ Operation: opencdc.OperationDelete, Metadata: map[string]string{ - opencdc.MetadataCollection: table, + opencdc.MetadataCollection: table, + opencdc.MetadataPayloadSchemaSubject: table, + opencdc.MetadataPayloadSchemaVersion: "1", }, Key: opencdc.StructuredData{"id": int64(4)}, Payload: opencdc.Change{ @@ -274,7 +282,9 @@ func TestCDCIterator_Next(t *testing.T) { want: opencdc.Record{ Operation: opencdc.OperationDelete, Metadata: map[string]string{ - opencdc.MetadataCollection: table, + opencdc.MetadataCollection: table, + opencdc.MetadataPayloadSchemaSubject: table, + opencdc.MetadataPayloadSchemaVersion: "1", }, Key: opencdc.StructuredData{"id": int64(3)}, Payload: opencdc.Change{ diff --git a/source/logrepl/combined.go b/source/logrepl/combined.go index 955fdd3..920c366 100644 --- a/source/logrepl/combined.go +++ b/source/logrepl/combined.go @@ -49,7 +49,6 @@ type Config struct { TableKeys map[string]string WithSnapshot bool SnapshotFetchSize int - WithAvroSchema bool } // Validate performs validation tasks on the config. @@ -179,7 +178,6 @@ func (c *CombinedIterator) initCDCIterator(ctx context.Context, pos position.Pos PublicationName: c.conf.PublicationName, Tables: c.conf.Tables, TableKeys: c.conf.TableKeys, - WithAvroSchema: c.conf.WithAvroSchema, }) if err != nil { return fmt.Errorf("failed to create CDC iterator: %w", err) @@ -201,12 +199,11 @@ func (c *CombinedIterator) initSnapshotIterator(ctx context.Context, pos positio } snapshotIterator, err := snapshot.NewIterator(ctx, c.pool, snapshot.Config{ - Position: c.conf.Position, - Tables: c.conf.Tables, - TableKeys: c.conf.TableKeys, - TXSnapshotID: c.cdcIterator.TXSnapshotID(), - FetchSize: c.conf.SnapshotFetchSize, - WithAvroSchema: c.conf.WithAvroSchema, + Position: c.conf.Position, + Tables: c.conf.Tables, + TableKeys: c.conf.TableKeys, + TXSnapshotID: c.cdcIterator.TXSnapshotID(), + FetchSize: c.conf.SnapshotFetchSize, }) if err != nil { return fmt.Errorf("failed to create snapshot iterator: %w", err) diff --git a/source/logrepl/handler.go b/source/logrepl/handler.go index c41a145..985b5c3 100644 --- a/source/logrepl/handler.go +++ b/source/logrepl/handler.go @@ -17,12 +17,13 @@ package logrepl import ( "context" "fmt" - "github.com/conduitio/conduit-commons/opencdc" + cschema "github.com/conduitio/conduit-commons/schema" "github.com/conduitio/conduit-connector-postgres/source/logrepl/internal" "github.com/conduitio/conduit-connector-postgres/source/position" "github.com/conduitio/conduit-connector-postgres/source/schema" sdk "github.com/conduitio/conduit-connector-sdk" + sdkschema "github.com/conduitio/conduit-connector-sdk/schema" "github.com/hamba/avro/v2" "github.com/jackc/pglogrepl" ) @@ -35,8 +36,7 @@ type CDCHandler struct { out chan<- opencdc.Record lastTXLSN pglogrepl.LSN - relAvroSchema map[string]avro.Schema - withAvroSchema bool + relAvroSchema map[string]avro.Schema } func NewCDCHandler( @@ -46,11 +46,10 @@ func NewCDCHandler( out chan<- opencdc.Record, ) *CDCHandler { return &CDCHandler{ - tableKeys: tableKeys, - relationSet: rs, - out: out, - withAvroSchema: withAvroSchema, - relAvroSchema: make(map[string]avro.Schema), + tableKeys: tableKeys, + relationSet: rs, + out: out, + relAvroSchema: make(map[string]avro.Schema), } } @@ -98,10 +97,10 @@ func (h *CDCHandler) handleInsert( ctx context.Context, msg *pglogrepl.InsertMessage, lsn pglogrepl.LSN, -) (err error) { +) error { rel, err := h.relationSet.Get(msg.RelationID) if err != nil { - return err + return fmt.Errorf("failed getting relation %v: %w", msg.RelationID, err) } newValues, err := h.relationSet.Values(msg.RelationID, msg.Tuple) @@ -120,6 +119,11 @@ func (h *CDCHandler) handleInsert( h.buildRecordPayload(newValues), ) + err = h.attachSchema(ctx, rec, rel.RelationName) + if err != nil { + return fmt.Errorf("failed to attach schema: %w", err) + } + return h.send(ctx, rec) } @@ -158,6 +162,12 @@ func (h *CDCHandler) handleUpdate( h.buildRecordPayload(oldValues), h.buildRecordPayload(newValues), ) + + err = h.attachSchema(ctx, rec, rel.RelationName) + if err != nil { + return fmt.Errorf("failed to attach schema: %w", err) + } + return h.send(ctx, rec) } @@ -189,6 +199,11 @@ func (h *CDCHandler) handleDelete( h.buildRecordPayload(oldValues), ) + err = h.attachSchema(ctx, rec, rel.RelationName) + if err != nil { + return fmt.Errorf("failed to attach schema: %w", err) + } + return h.send(ctx, rec) } @@ -208,10 +223,6 @@ func (h *CDCHandler) buildRecordMetadata(rel *pglogrepl.RelationMessage) map[str opencdc.MetadataCollection: rel.RelationName, } - if h.withAvroSchema { - m[schema.AvroMetadataKey] = h.relAvroSchema[rel.RelationName].String() - } - return m } @@ -249,11 +260,7 @@ func (*CDCHandler) buildPosition(lsn pglogrepl.LSN) opencdc.Position { // updateAvroSchema generates and stores avro schema based on the relation's row, // when usage of avro schema is requested. func (h *CDCHandler) updateAvroSchema(rel *pglogrepl.RelationMessage, row *pglogrepl.TupleData) error { - if !h.withAvroSchema { - return nil - } - - sch, err := schema.Avro.ExtractLogrepl(rel, row) + sch, err := schema.Avro.ExtractLogrepl(rel) if err != nil { return err } @@ -262,3 +269,20 @@ func (h *CDCHandler) updateAvroSchema(rel *pglogrepl.RelationMessage, row *pglog return nil } + +func (h *CDCHandler) attachSchema(ctx context.Context, rec opencdc.Record, relationName string) error { + sch, err := sdkschema.Create( + ctx, + cschema.TypeAvro, + relationName, + []byte(h.relAvroSchema[relationName].String()), + ) + if err != nil { + return fmt.Errorf("failed creating schema for relation %v: %w", relationName, err) + } + + // todo attach key schema + cschema.AttachPayloadSchemaToRecord(rec, sch) + + return nil +} diff --git a/source/schema/avro.go b/source/schema/avro.go index 112b853..e466a2b 100644 --- a/source/schema/avro.go +++ b/source/schema/avro.go @@ -66,10 +66,10 @@ type avroExtractor struct { avroMap map[string]*avro.PrimitiveSchema } -func (a avroExtractor) ExtractLogrepl(rel *pglogrepl.RelationMessage, row *pglogrepl.TupleData) (avro.Schema, error) { +func (a avroExtractor) ExtractLogrepl(rel *pglogrepl.RelationMessage) (avro.Schema, error) { var fields []pgconn.FieldDescription - for i := range row.Columns { + for i := range rel.Columns { fields = append(fields, pgconn.FieldDescription{ Name: rel.Columns[i].Name, DataTypeOID: rel.Columns[i].DataType, @@ -80,7 +80,7 @@ func (a avroExtractor) ExtractLogrepl(rel *pglogrepl.RelationMessage, row *pglog return a.Extract(rel.RelationName, fields) } -func (a *avroExtractor) Extract(name string, fields []pgconn.FieldDescription) (avro.Schema, error) { +func (a *avroExtractor) Extract(name string, fields []pgconn.FieldDescription) (*avro.RecordSchema, error) { var avroFields []*avro.Field for _, f := range fields { diff --git a/source/schema/schema.go b/source/schema/schema.go deleted file mode 100644 index dc3344c..0000000 --- a/source/schema/schema.go +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright © 2024 Meroxa, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package schema - -const AvroMetadataKey = "postgres.avro.schema" diff --git a/source/snapshot/fetch_worker.go b/source/snapshot/fetch_worker.go index 4eb8577..8864e65 100644 --- a/source/snapshot/fetch_worker.go +++ b/source/snapshot/fetch_worker.go @@ -43,12 +43,11 @@ var supportedKeyTypes = []string{ } type FetchConfig struct { - Table string - Key string - TXSnapshotID string - FetchSize int - Position position.Position - WithAvroSchema bool + Table string + Key string + TXSnapshotID string + FetchSize int + Position position.Position } var ( @@ -82,14 +81,14 @@ type FetchData struct { Payload opencdc.StructuredData Position position.SnapshotPosition Table string - AvroSchema avro.Schema + AvroSchema *avro.RecordSchema } type FetchWorker struct { conf FetchConfig db *pgxpool.Pool out chan<- FetchData - avroSchema avro.Schema + avroSchema *avro.RecordSchema snapshotEnd int64 lastRead int64 @@ -286,7 +285,7 @@ func (f *FetchWorker) fetch(ctx context.Context, tx pgx.Tx) (int, error) { return 0, fmt.Errorf("failed to get values: %w", err) } - if f.conf.WithAvroSchema && f.avroSchema == nil { + if f.avroSchema == nil { sch, err := schema.Avro.Extract(f.conf.Table, fields) if err != nil { return 0, fmt.Errorf("failed to extract schema: %w", err) diff --git a/source/snapshot/iterator.go b/source/snapshot/iterator.go index 001798a..3064f4e 100644 --- a/source/snapshot/iterator.go +++ b/source/snapshot/iterator.go @@ -18,11 +18,13 @@ import ( "context" "errors" "fmt" + cschema "github.com/conduitio/conduit-commons/schema" + sdkschema "github.com/conduitio/conduit-connector-sdk/schema" + "github.com/hamba/avro/v2" "github.com/conduitio/conduit-commons/csync" "github.com/conduitio/conduit-commons/opencdc" "github.com/conduitio/conduit-connector-postgres/source/position" - "github.com/conduitio/conduit-connector-postgres/source/schema" sdk "github.com/conduitio/conduit-connector-sdk" "github.com/jackc/pgx/v5/pgxpool" "gopkg.in/tomb.v2" @@ -31,12 +33,11 @@ import ( var ErrIteratorDone = errors.New("snapshot complete") type Config struct { - Position opencdc.Position - Tables []string - TableKeys map[string]string - TXSnapshotID string - FetchSize int - WithAvroSchema bool + Position opencdc.Position + Tables []string + TableKeys map[string]string + TXSnapshotID string + FetchSize int } type Iterator struct { @@ -96,7 +97,7 @@ func (i *Iterator) Next(ctx context.Context) (opencdc.Record, error) { } i.acks.Add(1) - return i.buildRecord(d), nil + return i.buildRecord(ctx, d) } } @@ -113,7 +114,7 @@ func (i *Iterator) Teardown(_ context.Context) error { return nil } -func (i *Iterator) buildRecord(d FetchData) opencdc.Record { +func (i *Iterator) buildRecord(ctx context.Context, d FetchData) (opencdc.Record, error) { // merge this position with latest position i.lastPosition.Type = position.TypeSnapshot i.lastPosition.Snapshots[d.Table] = d.Position @@ -122,11 +123,13 @@ func (i *Iterator) buildRecord(d FetchData) opencdc.Record { metadata := make(opencdc.Metadata) metadata["postgres.table"] = d.Table - if i.conf.WithAvroSchema { - metadata[schema.AvroMetadataKey] = d.AvroSchema.String() + rec := sdk.Util.Source.NewRecordSnapshot(pos, metadata, d.Key, d.Payload) + err := i.attachSchema(ctx, rec, d.AvroSchema) + if err != nil { + return opencdc.Record{}, fmt.Errorf("failed to attach schema: %w", err) } - return sdk.Util.Source.NewRecordSnapshot(pos, metadata, d.Key, d.Payload) + return rec, nil } func (i *Iterator) initFetchers(ctx context.Context) error { @@ -136,12 +139,11 @@ func (i *Iterator) initFetchers(ctx context.Context) error { for j, t := range i.conf.Tables { w := NewFetchWorker(i.db, i.data, FetchConfig{ - Table: t, - Key: i.conf.TableKeys[t], - TXSnapshotID: i.conf.TXSnapshotID, - Position: i.lastPosition, - FetchSize: i.conf.FetchSize, - WithAvroSchema: i.conf.WithAvroSchema, + Table: t, + Key: i.conf.TableKeys[t], + TXSnapshotID: i.conf.TXSnapshotID, + Position: i.lastPosition, + FetchSize: i.conf.FetchSize, }) if err := w.Validate(ctx); err != nil { @@ -170,3 +172,20 @@ func (i *Iterator) startWorkers() { close(i.data) }() } + +func (i *Iterator) attachSchema(ctx context.Context, rec opencdc.Record, schema *avro.RecordSchema) error { + sch, err := sdkschema.Create( + ctx, + cschema.TypeAvro, + schema.Name(), + []byte(schema.String()), + ) + if err != nil { + return fmt.Errorf("failed creating schema %v: %w", schema.Name(), err) + } + + // todo attach key schema + cschema.AttachPayloadSchemaToRecord(rec, sch) + + return nil +} diff --git a/test/docker-compose.yml b/test/docker-compose.yml index 043d879..8b6a1fc 100644 --- a/test/docker-compose.yml +++ b/test/docker-compose.yml @@ -1,4 +1,3 @@ -version: '3.4' services: pg-0: image: docker.io/bitnami/postgresql-repmgr:16 From 8161cc0963f806721afdb325848ba8e36112d859 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Thu, 12 Sep 2024 13:37:56 +0200 Subject: [PATCH 02/14] working, raw --- source.go | 3 +- source/logrepl/cdc.go | 4 +- source/logrepl/cdc_test.go | 10 +++++ source/logrepl/handler.go | 65 +++++++++++++++++++-------------- source/schema/avro.go | 25 +++++++++++-- source/snapshot/fetch_worker.go | 46 ++++++++++++++--------- source/snapshot/iterator.go | 25 +++++++++---- 7 files changed, 119 insertions(+), 59 deletions(-) diff --git a/source.go b/source.go index 6561f61..e1fe112 100644 --- a/source.go +++ b/source.go @@ -102,7 +102,8 @@ func (s *Source) Open(ctx context.Context, pos opencdc.Position) error { switch s.config.CDCMode { case source.CDCModeAuto: - // TODO add logic that checks if the DB supports logical replication (since that's the only thing we support at the moment) + // TODO add logic that checks if the DB supports logical replication + // (since that's the only thing we support at the moment) fallthrough case source.CDCModeLogrepl: i, err := logrepl.NewCombinedIterator(ctx, s.pool, logrepl.Config{ diff --git a/source/logrepl/cdc.go b/source/logrepl/cdc.go index 3716b61..056fec9 100644 --- a/source/logrepl/cdc.go +++ b/source/logrepl/cdc.go @@ -27,7 +27,7 @@ import ( "github.com/jackc/pgx/v5/pgxpool" ) -// Config holds configuration values for CDCIterator. +// CDCConfig holds configuration values for CDCIterator. type CDCConfig struct { LSN pglogrepl.LSN SlotName string @@ -65,7 +65,7 @@ func NewCDCIterator(ctx context.Context, pool *pgxpool.Pool, c CDCConfig) (*CDCI } records := make(chan opencdc.Record) - handler := NewCDCHandler(internal.NewRelationSet(), c.TableKeys, c.WithAvroSchema, records) + handler := NewCDCHandler(internal.NewRelationSet(), c.TableKeys, records) sub, err := internal.CreateSubscription( ctx, diff --git a/source/logrepl/cdc_test.go b/source/logrepl/cdc_test.go index 8100dc7..c7f1092 100644 --- a/source/logrepl/cdc_test.go +++ b/source/logrepl/cdc_test.go @@ -147,6 +147,8 @@ func TestCDCIterator_Next(t *testing.T) { Operation: opencdc.OperationCreate, Metadata: map[string]string{ opencdc.MetadataCollection: table, + opencdc.MetadataKeySchemaSubject: table + "_key", + opencdc.MetadataKeySchemaVersion: "1", opencdc.MetadataPayloadSchemaSubject: table, opencdc.MetadataPayloadSchemaVersion: "1", }, @@ -178,6 +180,8 @@ func TestCDCIterator_Next(t *testing.T) { Operation: opencdc.OperationUpdate, Metadata: map[string]string{ opencdc.MetadataCollection: table, + opencdc.MetadataKeySchemaSubject: table + "_key", + opencdc.MetadataKeySchemaVersion: "1", opencdc.MetadataPayloadSchemaSubject: table, opencdc.MetadataPayloadSchemaVersion: "1", }, @@ -210,6 +214,8 @@ func TestCDCIterator_Next(t *testing.T) { Operation: opencdc.OperationUpdate, Metadata: map[string]string{ opencdc.MetadataCollection: table, + opencdc.MetadataKeySchemaSubject: table + "_key", + opencdc.MetadataKeySchemaVersion: "1", opencdc.MetadataPayloadSchemaSubject: table, opencdc.MetadataPayloadSchemaVersion: "1", }, @@ -251,6 +257,8 @@ func TestCDCIterator_Next(t *testing.T) { Operation: opencdc.OperationDelete, Metadata: map[string]string{ opencdc.MetadataCollection: table, + opencdc.MetadataKeySchemaSubject: table + "_key", + opencdc.MetadataKeySchemaVersion: "1", opencdc.MetadataPayloadSchemaSubject: table, opencdc.MetadataPayloadSchemaVersion: "1", }, @@ -283,6 +291,8 @@ func TestCDCIterator_Next(t *testing.T) { Operation: opencdc.OperationDelete, Metadata: map[string]string{ opencdc.MetadataCollection: table, + opencdc.MetadataKeySchemaSubject: table + "_key", + opencdc.MetadataKeySchemaVersion: "1", opencdc.MetadataPayloadSchemaSubject: table, opencdc.MetadataPayloadSchemaVersion: "1", }, diff --git a/source/logrepl/handler.go b/source/logrepl/handler.go index 985b5c3..85fce4c 100644 --- a/source/logrepl/handler.go +++ b/source/logrepl/handler.go @@ -36,20 +36,17 @@ type CDCHandler struct { out chan<- opencdc.Record lastTXLSN pglogrepl.LSN - relAvroSchema map[string]avro.Schema + keySchemas map[string]avro.Schema + payloadSchemas map[string]avro.Schema } -func NewCDCHandler( - rs *internal.RelationSet, - tableKeys map[string]string, - withAvroSchema bool, - out chan<- opencdc.Record, -) *CDCHandler { +func NewCDCHandler(rs *internal.RelationSet, tableKeys map[string]string, out chan<- opencdc.Record) *CDCHandler { return &CDCHandler{ - tableKeys: tableKeys, - relationSet: rs, - out: out, - relAvroSchema: make(map[string]avro.Schema), + tableKeys: tableKeys, + relationSet: rs, + out: out, + payloadSchemas: make(map[string]avro.Schema), + keySchemas: make(map[string]avro.Schema), } } @@ -108,7 +105,7 @@ func (h *CDCHandler) handleInsert( return fmt.Errorf("failed to decode new values: %w", err) } - if err := h.updateAvroSchema(rel, msg.Tuple); err != nil { + if err := h.updateAvroSchema(rel); err != nil { return fmt.Errorf("failed to update avro schema: %w", err) } @@ -119,7 +116,7 @@ func (h *CDCHandler) handleInsert( h.buildRecordPayload(newValues), ) - err = h.attachSchema(ctx, rec, rel.RelationName) + err = h.attachSchemas(ctx, rec, rel.RelationName) if err != nil { return fmt.Errorf("failed to attach schema: %w", err) } @@ -144,7 +141,7 @@ func (h *CDCHandler) handleUpdate( return fmt.Errorf("failed to decode new values: %w", err) } - if err := h.updateAvroSchema(rel, msg.NewTuple); err != nil { + if err := h.updateAvroSchema(rel); err != nil { return fmt.Errorf("failed to update avro schema: %w", err) } @@ -163,7 +160,7 @@ func (h *CDCHandler) handleUpdate( h.buildRecordPayload(newValues), ) - err = h.attachSchema(ctx, rec, rel.RelationName) + err = h.attachSchemas(ctx, rec, rel.RelationName) if err != nil { return fmt.Errorf("failed to attach schema: %w", err) } @@ -188,7 +185,7 @@ func (h *CDCHandler) handleDelete( return fmt.Errorf("failed to decode old values: %w", err) } - if err := h.updateAvroSchema(rel, msg.OldTuple); err != nil { + if err := h.updateAvroSchema(rel); err != nil { return fmt.Errorf("failed to update avro schema: %w", err) } @@ -199,7 +196,7 @@ func (h *CDCHandler) handleDelete( h.buildRecordPayload(oldValues), ) - err = h.attachSchema(ctx, rec, rel.RelationName) + err = h.attachSchemas(ctx, rec, rel.RelationName) if err != nil { return fmt.Errorf("failed to attach schema: %w", err) } @@ -259,30 +256,44 @@ func (*CDCHandler) buildPosition(lsn pglogrepl.LSN) opencdc.Position { // updateAvroSchema generates and stores avro schema based on the relation's row, // when usage of avro schema is requested. -func (h *CDCHandler) updateAvroSchema(rel *pglogrepl.RelationMessage, row *pglogrepl.TupleData) error { - sch, err := schema.Avro.ExtractLogrepl(rel) +func (h *CDCHandler) updateAvroSchema(rel *pglogrepl.RelationMessage) error { + ps, err := schema.Avro.ExtractLogrepl(rel.RelationName, rel) if err != nil { - return err + return fmt.Errorf("failed to extract payload schema: %w", err) } + h.payloadSchemas[rel.RelationName] = ps - h.relAvroSchema[rel.RelationName] = sch + ks, err := schema.Avro.ExtractLogreplFields(rel.RelationName+"_key", rel, h.tableKeys[rel.RelationName]) + if err != nil { + return fmt.Errorf("failed to extract key schema: %w", err) + } + h.keySchemas[rel.RelationName+"_key"] = ks return nil } -func (h *CDCHandler) attachSchema(ctx context.Context, rec opencdc.Record, relationName string) error { - sch, err := sdkschema.Create( +func (h *CDCHandler) attachSchemas(ctx context.Context, rec opencdc.Record, relationName string) error { + ps, err := sdkschema.Create( ctx, cschema.TypeAvro, relationName, - []byte(h.relAvroSchema[relationName].String()), + []byte(h.payloadSchemas[relationName].String()), ) if err != nil { return fmt.Errorf("failed creating schema for relation %v: %w", relationName, err) } + cschema.AttachPayloadSchemaToRecord(rec, ps) - // todo attach key schema - cschema.AttachPayloadSchemaToRecord(rec, sch) - + // Key schema + ks, err := sdkschema.Create( + ctx, + cschema.TypeAvro, + relationName+"_key", + []byte(h.keySchemas[relationName+"_key"].String()), + ) + if err != nil { + return fmt.Errorf("failed creating schema for relation %v: %w", relationName, err) + } + cschema.AttachKeySchemaToRecord(rec, ks) return nil } diff --git a/source/schema/avro.go b/source/schema/avro.go index e466a2b..5f2b40b 100644 --- a/source/schema/avro.go +++ b/source/schema/avro.go @@ -66,7 +66,7 @@ type avroExtractor struct { avroMap map[string]*avro.PrimitiveSchema } -func (a avroExtractor) ExtractLogrepl(rel *pglogrepl.RelationMessage) (avro.Schema, error) { +func (a avroExtractor) ExtractLogrepl(schemaName string, rel *pglogrepl.RelationMessage) (*avro.RecordSchema, error) { var fields []pgconn.FieldDescription for i := range rel.Columns { @@ -77,13 +77,30 @@ func (a avroExtractor) ExtractLogrepl(rel *pglogrepl.RelationMessage) (avro.Sche }) } - return a.Extract(rel.RelationName, fields) + return a.Extract(schemaName, fields) } -func (a *avroExtractor) Extract(name string, fields []pgconn.FieldDescription) (*avro.RecordSchema, error) { +func (a avroExtractor) ExtractLogreplFields(schemaName string, rel *pglogrepl.RelationMessage, fieldName string) (*avro.RecordSchema, error) { + var fields []pgconn.FieldDescription + + for i := range rel.Columns { + fields = append(fields, pgconn.FieldDescription{ + Name: rel.Columns[i].Name, + DataTypeOID: rel.Columns[i].DataType, + TypeModifier: rel.Columns[i].TypeModifier, + }) + } + + return a.Extract(schemaName, fields, fieldName) +} + +func (a *avroExtractor) Extract(schemaName string, fields []pgconn.FieldDescription, fieldNames ...string) (*avro.RecordSchema, error) { var avroFields []*avro.Field for _, f := range fields { + if len(fieldNames) > 0 && !slices.Contains(fieldNames, f.Name) { + continue + } t, ok := a.pgMap.TypeForOID(f.DataTypeOID) if !ok { return nil, fmt.Errorf("field %q with OID %d cannot be resolved", f.Name, f.DataTypeOID) @@ -106,7 +123,7 @@ func (a *avroExtractor) Extract(name string, fields []pgconn.FieldDescription) ( return cmp.Compare(a.Name(), b.Name()) }) - sch, err := avro.NewRecordSchema(name, avroNS, avroFields) + sch, err := avro.NewRecordSchema(schemaName, avroNS, avroFields) if err != nil { return nil, fmt.Errorf("failed to create avro schema: %w", err) } diff --git a/source/snapshot/fetch_worker.go b/source/snapshot/fetch_worker.go index 8864e65..3b6cf6b 100644 --- a/source/snapshot/fetch_worker.go +++ b/source/snapshot/fetch_worker.go @@ -77,18 +77,21 @@ func (c FetchConfig) Validate() error { } type FetchData struct { - Key opencdc.StructuredData - Payload opencdc.StructuredData - Position position.SnapshotPosition - Table string - AvroSchema *avro.RecordSchema + Key opencdc.StructuredData + Payload opencdc.StructuredData + Position position.SnapshotPosition + Table string + PayloadSchema *avro.RecordSchema + KeySchema *avro.RecordSchema } type FetchWorker struct { - conf FetchConfig - db *pgxpool.Pool - out chan<- FetchData - avroSchema *avro.RecordSchema + conf FetchConfig + db *pgxpool.Pool + out chan<- FetchData + + keySchema *avro.RecordSchema + payloadSchema *avro.RecordSchema snapshotEnd int64 lastRead int64 @@ -285,13 +288,21 @@ func (f *FetchWorker) fetch(ctx context.Context, tx pgx.Tx) (int, error) { return 0, fmt.Errorf("failed to get values: %w", err) } - if f.avroSchema == nil { - sch, err := schema.Avro.Extract(f.conf.Table, fields) + if f.payloadSchema == nil { + ps, err := schema.Avro.Extract(f.conf.Table, fields) + if err != nil { + return 0, fmt.Errorf("failed to extract schema: %w", err) + } + + f.payloadSchema = ps + } + if f.keySchema == nil { + ks, err := schema.Avro.Extract(f.conf.Table+"_key", fields, f.conf.Key) if err != nil { return 0, fmt.Errorf("failed to extract schema: %w", err) } - f.avroSchema = sch + f.keySchema = ks } data, err := f.buildFetchData(fields, values) @@ -340,11 +351,12 @@ func (f *FetchWorker) buildFetchData(fields []pgconn.FieldDescription, values [] } return FetchData{ - Key: key, - Payload: payload, - Position: pos, - Table: f.conf.Table, - AvroSchema: f.avroSchema, + Key: key, + Payload: payload, + Position: pos, + Table: f.conf.Table, + PayloadSchema: f.payloadSchema, + KeySchema: f.keySchema, }, nil } diff --git a/source/snapshot/iterator.go b/source/snapshot/iterator.go index 3064f4e..24297a7 100644 --- a/source/snapshot/iterator.go +++ b/source/snapshot/iterator.go @@ -124,7 +124,7 @@ func (i *Iterator) buildRecord(ctx context.Context, d FetchData) (opencdc.Record metadata["postgres.table"] = d.Table rec := sdk.Util.Source.NewRecordSnapshot(pos, metadata, d.Key, d.Payload) - err := i.attachSchema(ctx, rec, d.AvroSchema) + err := i.attachSchemas(ctx, rec, d.PayloadSchema, d.KeySchema) if err != nil { return opencdc.Record{}, fmt.Errorf("failed to attach schema: %w", err) } @@ -173,19 +173,28 @@ func (i *Iterator) startWorkers() { }() } -func (i *Iterator) attachSchema(ctx context.Context, rec opencdc.Record, schema *avro.RecordSchema) error { - sch, err := sdkschema.Create( +func (i *Iterator) attachSchemas(ctx context.Context, rec opencdc.Record, payloadSchema *avro.RecordSchema, keySchema *avro.RecordSchema) error { + ps, err := sdkschema.Create( ctx, cschema.TypeAvro, - schema.Name(), - []byte(schema.String()), + payloadSchema.Name(), + []byte(payloadSchema.String()), ) if err != nil { - return fmt.Errorf("failed creating schema %v: %w", schema.Name(), err) + return fmt.Errorf("failed creating schema %v: %w", payloadSchema.Name(), err) } + cschema.AttachPayloadSchemaToRecord(rec, ps) - // todo attach key schema - cschema.AttachPayloadSchemaToRecord(rec, sch) + ks, err := sdkschema.Create( + ctx, + cschema.TypeAvro, + keySchema.Name(), + []byte(keySchema.String()), + ) + if err != nil { + return fmt.Errorf("failed creating schema %v: %w", keySchema.Name(), err) + } + cschema.AttachKeySchemaToRecord(rec, ks) return nil } From cc9119c8a4fb97b00b49b015aee21c21d72226ae Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Thu, 12 Sep 2024 15:57:06 +0200 Subject: [PATCH 03/14] refactor cdc schema handling --- source/logrepl/handler.go | 76 ++++++++++++++------------------- source/schema/avro.go | 19 ++------- source/snapshot/fetch_worker.go | 47 +++++++++++--------- 3 files changed, 62 insertions(+), 80 deletions(-) diff --git a/source/logrepl/handler.go b/source/logrepl/handler.go index 85fce4c..e5c0adc 100644 --- a/source/logrepl/handler.go +++ b/source/logrepl/handler.go @@ -24,7 +24,6 @@ import ( "github.com/conduitio/conduit-connector-postgres/source/schema" sdk "github.com/conduitio/conduit-connector-sdk" sdkschema "github.com/conduitio/conduit-connector-sdk/schema" - "github.com/hamba/avro/v2" "github.com/jackc/pglogrepl" ) @@ -36,8 +35,8 @@ type CDCHandler struct { out chan<- opencdc.Record lastTXLSN pglogrepl.LSN - keySchemas map[string]avro.Schema - payloadSchemas map[string]avro.Schema + keySchemas map[string]cschema.Schema + payloadSchemas map[string]cschema.Schema } func NewCDCHandler(rs *internal.RelationSet, tableKeys map[string]string, out chan<- opencdc.Record) *CDCHandler { @@ -45,8 +44,8 @@ func NewCDCHandler(rs *internal.RelationSet, tableKeys map[string]string, out ch tableKeys: tableKeys, relationSet: rs, out: out, - payloadSchemas: make(map[string]avro.Schema), - keySchemas: make(map[string]avro.Schema), + keySchemas: make(map[string]cschema.Schema), + payloadSchemas: make(map[string]cschema.Schema), } } @@ -105,7 +104,7 @@ func (h *CDCHandler) handleInsert( return fmt.Errorf("failed to decode new values: %w", err) } - if err := h.updateAvroSchema(rel); err != nil { + if err := h.updateAvroSchema(ctx, rel); err != nil { return fmt.Errorf("failed to update avro schema: %w", err) } @@ -115,11 +114,7 @@ func (h *CDCHandler) handleInsert( h.buildRecordKey(newValues, rel.RelationName), h.buildRecordPayload(newValues), ) - - err = h.attachSchemas(ctx, rec, rel.RelationName) - if err != nil { - return fmt.Errorf("failed to attach schema: %w", err) - } + h.attachSchemas(rec, rel.RelationName) return h.send(ctx, rec) } @@ -141,7 +136,7 @@ func (h *CDCHandler) handleUpdate( return fmt.Errorf("failed to decode new values: %w", err) } - if err := h.updateAvroSchema(rel); err != nil { + if err := h.updateAvroSchema(ctx, rel); err != nil { return fmt.Errorf("failed to update avro schema: %w", err) } @@ -159,11 +154,7 @@ func (h *CDCHandler) handleUpdate( h.buildRecordPayload(oldValues), h.buildRecordPayload(newValues), ) - - err = h.attachSchemas(ctx, rec, rel.RelationName) - if err != nil { - return fmt.Errorf("failed to attach schema: %w", err) - } + h.attachSchemas(rec, rel.RelationName) return h.send(ctx, rec) } @@ -185,7 +176,7 @@ func (h *CDCHandler) handleDelete( return fmt.Errorf("failed to decode old values: %w", err) } - if err := h.updateAvroSchema(rel); err != nil { + if err := h.updateAvroSchema(ctx, rel); err != nil { return fmt.Errorf("failed to update avro schema: %w", err) } @@ -195,11 +186,7 @@ func (h *CDCHandler) handleDelete( h.buildRecordKey(oldValues, rel.RelationName), h.buildRecordPayload(oldValues), ) - - err = h.attachSchemas(ctx, rec, rel.RelationName) - if err != nil { - return fmt.Errorf("failed to attach schema: %w", err) - } + h.attachSchemas(rec, rel.RelationName) return h.send(ctx, rec) } @@ -256,44 +243,43 @@ func (*CDCHandler) buildPosition(lsn pglogrepl.LSN) opencdc.Position { // updateAvroSchema generates and stores avro schema based on the relation's row, // when usage of avro schema is requested. -func (h *CDCHandler) updateAvroSchema(rel *pglogrepl.RelationMessage) error { - ps, err := schema.Avro.ExtractLogrepl(rel.RelationName, rel) +func (h *CDCHandler) updateAvroSchema(ctx context.Context, rel *pglogrepl.RelationMessage) error { + // Payload schema + avroPayloadSch, err := schema.Avro.ExtractLogrepl(rel.RelationName, rel) if err != nil { return fmt.Errorf("failed to extract payload schema: %w", err) } - h.payloadSchemas[rel.RelationName] = ps - - ks, err := schema.Avro.ExtractLogreplFields(rel.RelationName+"_key", rel, h.tableKeys[rel.RelationName]) - if err != nil { - return fmt.Errorf("failed to extract key schema: %w", err) - } - h.keySchemas[rel.RelationName+"_key"] = ks - - return nil -} - -func (h *CDCHandler) attachSchemas(ctx context.Context, rec opencdc.Record, relationName string) error { ps, err := sdkschema.Create( ctx, cschema.TypeAvro, - relationName, - []byte(h.payloadSchemas[relationName].String()), + avroPayloadSch.Name(), + []byte(avroPayloadSch.String()), ) if err != nil { - return fmt.Errorf("failed creating schema for relation %v: %w", relationName, err) + return fmt.Errorf("failed creating payload schema for relation %v: %w", rel.RelationName, err) } - cschema.AttachPayloadSchemaToRecord(rec, ps) + h.payloadSchemas[rel.RelationName] = ps // Key schema + avroKeySch, err := schema.Avro.ExtractLogrepl(rel.RelationName+"_key", rel, h.tableKeys[rel.RelationName]) + if err != nil { + return fmt.Errorf("failed to extract key schema: %w", err) + } ks, err := sdkschema.Create( ctx, cschema.TypeAvro, - relationName+"_key", - []byte(h.keySchemas[relationName+"_key"].String()), + avroKeySch.Name(), + []byte(avroKeySch.String()), ) if err != nil { - return fmt.Errorf("failed creating schema for relation %v: %w", relationName, err) + return fmt.Errorf("failed creating key schema for relation %v: %w", rel.RelationName, err) } - cschema.AttachKeySchemaToRecord(rec, ks) + h.keySchemas[rel.RelationName] = ks + return nil } + +func (h *CDCHandler) attachSchemas(rec opencdc.Record, relationName string) { + cschema.AttachPayloadSchemaToRecord(rec, h.payloadSchemas[relationName]) + cschema.AttachKeySchemaToRecord(rec, h.keySchemas[relationName]) +} diff --git a/source/schema/avro.go b/source/schema/avro.go index 5f2b40b..abb07d9 100644 --- a/source/schema/avro.go +++ b/source/schema/avro.go @@ -66,7 +66,7 @@ type avroExtractor struct { avroMap map[string]*avro.PrimitiveSchema } -func (a avroExtractor) ExtractLogrepl(schemaName string, rel *pglogrepl.RelationMessage) (*avro.RecordSchema, error) { +func (a avroExtractor) ExtractLogrepl(schemaName string, rel *pglogrepl.RelationMessage, fieldNames ...string) (*avro.RecordSchema, error) { var fields []pgconn.FieldDescription for i := range rel.Columns { @@ -77,21 +77,7 @@ func (a avroExtractor) ExtractLogrepl(schemaName string, rel *pglogrepl.Relation }) } - return a.Extract(schemaName, fields) -} - -func (a avroExtractor) ExtractLogreplFields(schemaName string, rel *pglogrepl.RelationMessage, fieldName string) (*avro.RecordSchema, error) { - var fields []pgconn.FieldDescription - - for i := range rel.Columns { - fields = append(fields, pgconn.FieldDescription{ - Name: rel.Columns[i].Name, - DataTypeOID: rel.Columns[i].DataType, - TypeModifier: rel.Columns[i].TypeModifier, - }) - } - - return a.Extract(schemaName, fields, fieldName) + return a.Extract(schemaName, fields, fieldNames...) } func (a *avroExtractor) Extract(schemaName string, fields []pgconn.FieldDescription, fieldNames ...string) (*avro.RecordSchema, error) { @@ -101,6 +87,7 @@ func (a *avroExtractor) Extract(schemaName string, fields []pgconn.FieldDescript if len(fieldNames) > 0 && !slices.Contains(fieldNames, f.Name) { continue } + t, ok := a.pgMap.TypeForOID(f.DataTypeOID) if !ok { return nil, fmt.Errorf("field %q with OID %d cannot be resolved", f.Name, f.DataTypeOID) diff --git a/source/snapshot/fetch_worker.go b/source/snapshot/fetch_worker.go index 3b6cf6b..ad18249 100644 --- a/source/snapshot/fetch_worker.go +++ b/source/snapshot/fetch_worker.go @@ -18,13 +18,13 @@ import ( "context" "errors" "fmt" + "github.com/conduitio/conduit-connector-postgres/source/schema" "slices" "strings" "time" "github.com/conduitio/conduit-commons/opencdc" "github.com/conduitio/conduit-connector-postgres/source/position" - "github.com/conduitio/conduit-connector-postgres/source/schema" "github.com/conduitio/conduit-connector-postgres/source/types" sdk "github.com/conduitio/conduit-connector-sdk" "github.com/google/uuid" @@ -85,6 +85,7 @@ type FetchData struct { KeySchema *avro.RecordSchema } +// FetchWorker fetches snapshot data from a single table type FetchWorker struct { conf FetchConfig db *pgxpool.Pool @@ -279,32 +280,18 @@ func (f *FetchWorker) fetch(ctx context.Context, tx pgx.Tx) (int, error) { Msg("cursor fetched data") fields := rows.FieldDescriptions() + err = f.initSchemas(fields) + if err != nil { + return 0, fmt.Errorf("failed to init schemas: %w", err) + } var nread int - for rows.Next() { values, err := rows.Values() if err != nil { return 0, fmt.Errorf("failed to get values: %w", err) } - if f.payloadSchema == nil { - ps, err := schema.Avro.Extract(f.conf.Table, fields) - if err != nil { - return 0, fmt.Errorf("failed to extract schema: %w", err) - } - - f.payloadSchema = ps - } - if f.keySchema == nil { - ks, err := schema.Avro.Extract(f.conf.Table+"_key", fields, f.conf.Key) - if err != nil { - return 0, fmt.Errorf("failed to extract schema: %w", err) - } - - f.keySchema = ks - } - data, err := f.buildFetchData(fields, values) if err != nil { return nread, fmt.Errorf("failed to build fetch data: %w", err) @@ -478,3 +465,25 @@ func (*FetchWorker) validateTable(ctx context.Context, table string, tx pgx.Tx) return nil } + +func (f *FetchWorker) initSchemas(fields []pgconn.FieldDescription) error { + if f.payloadSchema == nil { + ps, err := schema.Avro.Extract(f.conf.Table, fields) + if err != nil { + return fmt.Errorf("failed to extract schema: %w", err) + } + + f.payloadSchema = ps + } + + if f.keySchema == nil { + ks, err := schema.Avro.Extract(f.conf.Table+"_key", fields, f.conf.Key) + if err != nil { + return fmt.Errorf("failed to extract schema: %w", err) + } + + f.keySchema = ks + } + + return nil +} From a9a6f7b805fc66a08dbb16fa0475d3e8a6dcb81c Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Thu, 12 Sep 2024 17:39:54 +0200 Subject: [PATCH 04/14] snapshot, init schemas before loop --- source/snapshot/fetch_worker.go | 58 ++++++++++++++++++++------------- source/snapshot/iterator.go | 43 ++++-------------------- 2 files changed, 42 insertions(+), 59 deletions(-) diff --git a/source/snapshot/fetch_worker.go b/source/snapshot/fetch_worker.go index ad18249..f0a4fcf 100644 --- a/source/snapshot/fetch_worker.go +++ b/source/snapshot/fetch_worker.go @@ -18,17 +18,18 @@ import ( "context" "errors" "fmt" - "github.com/conduitio/conduit-connector-postgres/source/schema" "slices" "strings" "time" "github.com/conduitio/conduit-commons/opencdc" + cschema "github.com/conduitio/conduit-commons/schema" "github.com/conduitio/conduit-connector-postgres/source/position" + "github.com/conduitio/conduit-connector-postgres/source/schema" "github.com/conduitio/conduit-connector-postgres/source/types" sdk "github.com/conduitio/conduit-connector-sdk" + sdkschema "github.com/conduitio/conduit-connector-sdk/schema" "github.com/google/uuid" - "github.com/hamba/avro/v2" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" @@ -81,8 +82,8 @@ type FetchData struct { Payload opencdc.StructuredData Position position.SnapshotPosition Table string - PayloadSchema *avro.RecordSchema - KeySchema *avro.RecordSchema + PayloadSchema cschema.Schema + KeySchema cschema.Schema } // FetchWorker fetches snapshot data from a single table @@ -91,8 +92,8 @@ type FetchWorker struct { db *pgxpool.Pool out chan<- FetchData - keySchema *avro.RecordSchema - payloadSchema *avro.RecordSchema + keySchema cschema.Schema + payloadSchema cschema.Schema snapshotEnd int64 lastRead int64 @@ -280,7 +281,7 @@ func (f *FetchWorker) fetch(ctx context.Context, tx pgx.Tx) (int, error) { Msg("cursor fetched data") fields := rows.FieldDescriptions() - err = f.initSchemas(fields) + err = f.initSchemas(ctx, fields) if err != nil { return 0, fmt.Errorf("failed to init schemas: %w", err) } @@ -466,24 +467,37 @@ func (*FetchWorker) validateTable(ctx context.Context, table string, tx pgx.Tx) return nil } -func (f *FetchWorker) initSchemas(fields []pgconn.FieldDescription) error { - if f.payloadSchema == nil { - ps, err := schema.Avro.Extract(f.conf.Table, fields) - if err != nil { - return fmt.Errorf("failed to extract schema: %w", err) - } - - f.payloadSchema = ps +// todo this should happen only once? +func (f *FetchWorker) initSchemas(ctx context.Context, fields []pgconn.FieldDescription) error { + avroPayloadSch, err := schema.Avro.Extract(f.conf.Table, fields) + if err != nil { + return fmt.Errorf("failed to extract payload schema for table %v: %w", f.conf.Table, err) } + ps, err := sdkschema.Create( + ctx, + cschema.TypeAvro, + avroPayloadSch.Name(), + []byte(avroPayloadSch.String()), + ) + if err != nil { + return fmt.Errorf("failed creating payload schema for table %v: %w", f.conf.Table, err) + } + f.payloadSchema = ps - if f.keySchema == nil { - ks, err := schema.Avro.Extract(f.conf.Table+"_key", fields, f.conf.Key) - if err != nil { - return fmt.Errorf("failed to extract schema: %w", err) - } - - f.keySchema = ks + avroKeySch, err := schema.Avro.Extract(f.conf.Table+"_key", fields, f.conf.Key) + if err != nil { + return fmt.Errorf("failed to extract key schema for table %v: %w", f.conf.Table, err) + } + ks, err := sdkschema.Create( + ctx, + cschema.TypeAvro, + avroKeySch.Name(), + []byte(avroKeySch.String()), + ) + if err != nil { + return fmt.Errorf("failed creating key schema for table %v: %w", f.conf.Table, err) } + f.keySchema = ks return nil } diff --git a/source/snapshot/iterator.go b/source/snapshot/iterator.go index 24297a7..2204201 100644 --- a/source/snapshot/iterator.go +++ b/source/snapshot/iterator.go @@ -18,12 +18,9 @@ import ( "context" "errors" "fmt" - cschema "github.com/conduitio/conduit-commons/schema" - sdkschema "github.com/conduitio/conduit-connector-sdk/schema" - "github.com/hamba/avro/v2" - "github.com/conduitio/conduit-commons/csync" "github.com/conduitio/conduit-commons/opencdc" + cschema "github.com/conduitio/conduit-commons/schema" "github.com/conduitio/conduit-connector-postgres/source/position" sdk "github.com/conduitio/conduit-connector-sdk" "github.com/jackc/pgx/v5/pgxpool" @@ -97,7 +94,7 @@ func (i *Iterator) Next(ctx context.Context) (opencdc.Record, error) { } i.acks.Add(1) - return i.buildRecord(ctx, d) + return i.buildRecord(d), nil } } @@ -114,7 +111,7 @@ func (i *Iterator) Teardown(_ context.Context) error { return nil } -func (i *Iterator) buildRecord(ctx context.Context, d FetchData) (opencdc.Record, error) { +func (i *Iterator) buildRecord(d FetchData) opencdc.Record { // merge this position with latest position i.lastPosition.Type = position.TypeSnapshot i.lastPosition.Snapshots[d.Table] = d.Position @@ -124,12 +121,10 @@ func (i *Iterator) buildRecord(ctx context.Context, d FetchData) (opencdc.Record metadata["postgres.table"] = d.Table rec := sdk.Util.Source.NewRecordSnapshot(pos, metadata, d.Key, d.Payload) - err := i.attachSchemas(ctx, rec, d.PayloadSchema, d.KeySchema) - if err != nil { - return opencdc.Record{}, fmt.Errorf("failed to attach schema: %w", err) - } + cschema.AttachKeySchemaToRecord(rec, d.KeySchema) + cschema.AttachPayloadSchemaToRecord(rec, d.PayloadSchema) - return rec, nil + return rec } func (i *Iterator) initFetchers(ctx context.Context) error { @@ -172,29 +167,3 @@ func (i *Iterator) startWorkers() { close(i.data) }() } - -func (i *Iterator) attachSchemas(ctx context.Context, rec opencdc.Record, payloadSchema *avro.RecordSchema, keySchema *avro.RecordSchema) error { - ps, err := sdkschema.Create( - ctx, - cschema.TypeAvro, - payloadSchema.Name(), - []byte(payloadSchema.String()), - ) - if err != nil { - return fmt.Errorf("failed creating schema %v: %w", payloadSchema.Name(), err) - } - cschema.AttachPayloadSchemaToRecord(rec, ps) - - ks, err := sdkschema.Create( - ctx, - cschema.TypeAvro, - keySchema.Name(), - []byte(keySchema.String()), - ) - if err != nil { - return fmt.Errorf("failed creating schema %v: %w", keySchema.Name(), err) - } - cschema.AttachKeySchemaToRecord(rec, ks) - - return nil -} From e8af93dd5032d9b48bdeb04aed0b8b5ecd9ced49 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Thu, 12 Sep 2024 18:55:43 +0200 Subject: [PATCH 05/14] revert --- source/snapshot/fetch_worker.go | 68 ++++++++++++++++----------------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/source/snapshot/fetch_worker.go b/source/snapshot/fetch_worker.go index f0a4fcf..3741202 100644 --- a/source/snapshot/fetch_worker.go +++ b/source/snapshot/fetch_worker.go @@ -92,8 +92,8 @@ type FetchWorker struct { db *pgxpool.Pool out chan<- FetchData - keySchema cschema.Schema - payloadSchema cschema.Schema + keySchema *cschema.Schema + payloadSchema *cschema.Schema snapshotEnd int64 lastRead int64 @@ -281,10 +281,6 @@ func (f *FetchWorker) fetch(ctx context.Context, tx pgx.Tx) (int, error) { Msg("cursor fetched data") fields := rows.FieldDescriptions() - err = f.initSchemas(ctx, fields) - if err != nil { - return 0, fmt.Errorf("failed to init schemas: %w", err) - } var nread int for rows.Next() { @@ -343,8 +339,8 @@ func (f *FetchWorker) buildFetchData(fields []pgconn.FieldDescription, values [] Payload: payload, Position: pos, Table: f.conf.Table, - PayloadSchema: f.payloadSchema, - KeySchema: f.keySchema, + PayloadSchema: *f.payloadSchema, + KeySchema: *f.keySchema, }, nil } @@ -469,35 +465,39 @@ func (*FetchWorker) validateTable(ctx context.Context, table string, tx pgx.Tx) // todo this should happen only once? func (f *FetchWorker) initSchemas(ctx context.Context, fields []pgconn.FieldDescription) error { - avroPayloadSch, err := schema.Avro.Extract(f.conf.Table, fields) - if err != nil { - return fmt.Errorf("failed to extract payload schema for table %v: %w", f.conf.Table, err) - } - ps, err := sdkschema.Create( - ctx, - cschema.TypeAvro, - avroPayloadSch.Name(), - []byte(avroPayloadSch.String()), - ) - if err != nil { - return fmt.Errorf("failed creating payload schema for table %v: %w", f.conf.Table, err) + if f.payloadSchema == nil { + avroPayloadSch, err := schema.Avro.Extract(f.conf.Table, fields) + if err != nil { + return fmt.Errorf("failed to extract payload schema for table %v: %w", f.conf.Table, err) + } + ps, err := sdkschema.Create( + ctx, + cschema.TypeAvro, + avroPayloadSch.Name(), + []byte(avroPayloadSch.String()), + ) + if err != nil { + return fmt.Errorf("failed creating payload schema for table %v: %w", f.conf.Table, err) + } + f.payloadSchema = &ps } - f.payloadSchema = ps - avroKeySch, err := schema.Avro.Extract(f.conf.Table+"_key", fields, f.conf.Key) - if err != nil { - return fmt.Errorf("failed to extract key schema for table %v: %w", f.conf.Table, err) - } - ks, err := sdkschema.Create( - ctx, - cschema.TypeAvro, - avroKeySch.Name(), - []byte(avroKeySch.String()), - ) - if err != nil { - return fmt.Errorf("failed creating key schema for table %v: %w", f.conf.Table, err) + if f.keySchema == nil { + avroKeySch, err := schema.Avro.Extract(f.conf.Table+"_key", fields, f.conf.Key) + if err != nil { + return fmt.Errorf("failed to extract key schema for table %v: %w", f.conf.Table, err) + } + ks, err := sdkschema.Create( + ctx, + cschema.TypeAvro, + avroKeySch.Name(), + []byte(avroKeySch.String()), + ) + if err != nil { + return fmt.Errorf("failed creating key schema for table %v: %w", f.conf.Table, err) + } + f.keySchema = &ks } - f.keySchema = ks return nil } From 4fcf38481d99cba951b9e21d0c1bb90a65b77233 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Thu, 12 Sep 2024 18:56:53 +0200 Subject: [PATCH 06/14] revert --- source/snapshot/fetch_worker.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/source/snapshot/fetch_worker.go b/source/snapshot/fetch_worker.go index 3741202..b2a564c 100644 --- a/source/snapshot/fetch_worker.go +++ b/source/snapshot/fetch_worker.go @@ -281,14 +281,19 @@ func (f *FetchWorker) fetch(ctx context.Context, tx pgx.Tx) (int, error) { Msg("cursor fetched data") fields := rows.FieldDescriptions() - var nread int + for rows.Next() { values, err := rows.Values() if err != nil { return 0, fmt.Errorf("failed to get values: %w", err) } + err = f.initSchemas(ctx, fields) + if err != nil { + return 0, fmt.Errorf("failed to init schemas: %w", err) + } + data, err := f.buildFetchData(fields, values) if err != nil { return nread, fmt.Errorf("failed to build fetch data: %w", err) From 710e97a9c15d120f284db512711f3f873828018c Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Thu, 12 Sep 2024 19:39:03 +0200 Subject: [PATCH 07/14] add suffixes --- source/logrepl/cdc_test.go | 10 +++++----- source/logrepl/handler.go | 2 +- source/snapshot/fetch_worker.go | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/source/logrepl/cdc_test.go b/source/logrepl/cdc_test.go index c7f1092..2df292f 100644 --- a/source/logrepl/cdc_test.go +++ b/source/logrepl/cdc_test.go @@ -149,7 +149,7 @@ func TestCDCIterator_Next(t *testing.T) { opencdc.MetadataCollection: table, opencdc.MetadataKeySchemaSubject: table + "_key", opencdc.MetadataKeySchemaVersion: "1", - opencdc.MetadataPayloadSchemaSubject: table, + opencdc.MetadataPayloadSchemaSubject: table + "_payload", opencdc.MetadataPayloadSchemaVersion: "1", }, Key: opencdc.StructuredData{"id": int64(6)}, @@ -182,7 +182,7 @@ func TestCDCIterator_Next(t *testing.T) { opencdc.MetadataCollection: table, opencdc.MetadataKeySchemaSubject: table + "_key", opencdc.MetadataKeySchemaVersion: "1", - opencdc.MetadataPayloadSchemaSubject: table, + opencdc.MetadataPayloadSchemaSubject: table + "_payload", opencdc.MetadataPayloadSchemaVersion: "1", }, Key: opencdc.StructuredData{"id": int64(1)}, @@ -216,7 +216,7 @@ func TestCDCIterator_Next(t *testing.T) { opencdc.MetadataCollection: table, opencdc.MetadataKeySchemaSubject: table + "_key", opencdc.MetadataKeySchemaVersion: "1", - opencdc.MetadataPayloadSchemaSubject: table, + opencdc.MetadataPayloadSchemaSubject: table + "_payload", opencdc.MetadataPayloadSchemaVersion: "1", }, Key: opencdc.StructuredData{"id": int64(1)}, @@ -259,7 +259,7 @@ func TestCDCIterator_Next(t *testing.T) { opencdc.MetadataCollection: table, opencdc.MetadataKeySchemaSubject: table + "_key", opencdc.MetadataKeySchemaVersion: "1", - opencdc.MetadataPayloadSchemaSubject: table, + opencdc.MetadataPayloadSchemaSubject: table + "_payload", opencdc.MetadataPayloadSchemaVersion: "1", }, Key: opencdc.StructuredData{"id": int64(4)}, @@ -293,7 +293,7 @@ func TestCDCIterator_Next(t *testing.T) { opencdc.MetadataCollection: table, opencdc.MetadataKeySchemaSubject: table + "_key", opencdc.MetadataKeySchemaVersion: "1", - opencdc.MetadataPayloadSchemaSubject: table, + opencdc.MetadataPayloadSchemaSubject: table + "_payload", opencdc.MetadataPayloadSchemaVersion: "1", }, Key: opencdc.StructuredData{"id": int64(3)}, diff --git a/source/logrepl/handler.go b/source/logrepl/handler.go index e5c0adc..ad50c10 100644 --- a/source/logrepl/handler.go +++ b/source/logrepl/handler.go @@ -245,7 +245,7 @@ func (*CDCHandler) buildPosition(lsn pglogrepl.LSN) opencdc.Position { // when usage of avro schema is requested. func (h *CDCHandler) updateAvroSchema(ctx context.Context, rel *pglogrepl.RelationMessage) error { // Payload schema - avroPayloadSch, err := schema.Avro.ExtractLogrepl(rel.RelationName, rel) + avroPayloadSch, err := schema.Avro.ExtractLogrepl(rel.RelationName+"_payload", rel) if err != nil { return fmt.Errorf("failed to extract payload schema: %w", err) } diff --git a/source/snapshot/fetch_worker.go b/source/snapshot/fetch_worker.go index b2a564c..d1e3537 100644 --- a/source/snapshot/fetch_worker.go +++ b/source/snapshot/fetch_worker.go @@ -471,7 +471,7 @@ func (*FetchWorker) validateTable(ctx context.Context, table string, tx pgx.Tx) // todo this should happen only once? func (f *FetchWorker) initSchemas(ctx context.Context, fields []pgconn.FieldDescription) error { if f.payloadSchema == nil { - avroPayloadSch, err := schema.Avro.Extract(f.conf.Table, fields) + avroPayloadSch, err := schema.Avro.Extract(f.conf.Table+"_payload", fields) if err != nil { return fmt.Errorf("failed to extract payload schema for table %v: %w", f.conf.Table, err) } From d2881c489ebc9bc6e90e9aa46b0b8b6a19639f63 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Thu, 12 Sep 2024 19:41:00 +0200 Subject: [PATCH 08/14] linter --- .golangci.yml | 2 +- source/logrepl/handler.go | 1 + source/snapshot/iterator.go | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/.golangci.yml b/.golangci.yml index 5f11240..b0e5d7c 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -43,7 +43,7 @@ linters: # - errorlint # - exhaustive # - exhaustivestruct - - exportloopref + - copyloopvar - forbidigo # - forcetypeassert # - funlen diff --git a/source/logrepl/handler.go b/source/logrepl/handler.go index ad50c10..d1a504d 100644 --- a/source/logrepl/handler.go +++ b/source/logrepl/handler.go @@ -17,6 +17,7 @@ package logrepl import ( "context" "fmt" + "github.com/conduitio/conduit-commons/opencdc" cschema "github.com/conduitio/conduit-commons/schema" "github.com/conduitio/conduit-connector-postgres/source/logrepl/internal" diff --git a/source/snapshot/iterator.go b/source/snapshot/iterator.go index 2204201..fbf6317 100644 --- a/source/snapshot/iterator.go +++ b/source/snapshot/iterator.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "github.com/conduitio/conduit-commons/csync" "github.com/conduitio/conduit-commons/opencdc" cschema "github.com/conduitio/conduit-commons/schema" From 03a5c61c0d40425ea5dd567d9a4dcd789b6679e8 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Thu, 12 Sep 2024 19:52:48 +0200 Subject: [PATCH 09/14] comments --- source/schema/avro.go | 4 ++++ source/snapshot/fetch_worker.go | 7 +++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/source/schema/avro.go b/source/schema/avro.go index abb07d9..58632b1 100644 --- a/source/schema/avro.go +++ b/source/schema/avro.go @@ -66,6 +66,8 @@ type avroExtractor struct { avroMap map[string]*avro.PrimitiveSchema } +// ExtractLogrepl extracts an Avro schema from the given pglogrepl.RelationMessage. +// If `fieldNames` are specified, then only the given fields will be included in the schema. func (a avroExtractor) ExtractLogrepl(schemaName string, rel *pglogrepl.RelationMessage, fieldNames ...string) (*avro.RecordSchema, error) { var fields []pgconn.FieldDescription @@ -80,6 +82,8 @@ func (a avroExtractor) ExtractLogrepl(schemaName string, rel *pglogrepl.Relation return a.Extract(schemaName, fields, fieldNames...) } +// Extract extracts an Avro schema from the given Postgres field descriptions. +// If `fieldNames` are specified, then only the given fields will be included in the schema. func (a *avroExtractor) Extract(schemaName string, fields []pgconn.FieldDescription, fieldNames ...string) (*avro.RecordSchema, error) { var avroFields []*avro.Field diff --git a/source/snapshot/fetch_worker.go b/source/snapshot/fetch_worker.go index d1e3537..fc97e98 100644 --- a/source/snapshot/fetch_worker.go +++ b/source/snapshot/fetch_worker.go @@ -289,9 +289,9 @@ func (f *FetchWorker) fetch(ctx context.Context, tx pgx.Tx) (int, error) { return 0, fmt.Errorf("failed to get values: %w", err) } - err = f.initSchemas(ctx, fields) + err = f.extractSchemas(ctx, fields) if err != nil { - return 0, fmt.Errorf("failed to init schemas: %w", err) + return 0, fmt.Errorf("failed to extract schemas: %w", err) } data, err := f.buildFetchData(fields, values) @@ -468,8 +468,7 @@ func (*FetchWorker) validateTable(ctx context.Context, table string, tx pgx.Tx) return nil } -// todo this should happen only once? -func (f *FetchWorker) initSchemas(ctx context.Context, fields []pgconn.FieldDescription) error { +func (f *FetchWorker) extractSchemas(ctx context.Context, fields []pgconn.FieldDescription) error { if f.payloadSchema == nil { avroPayloadSch, err := schema.Avro.Extract(f.conf.Table+"_payload", fields) if err != nil { From e36eecce65638bde1cd20b8e6ffed5d06f096b4c Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Fri, 13 Sep 2024 15:05:32 +0200 Subject: [PATCH 10/14] more tests --- source/logrepl/cdc_test.go | 130 +++++++++++++++++++++++++++++++++++++ source/schema/avro.go | 4 +- test/helper.go | 124 +++++++++++++++++++++++++++++++---- 3 files changed, 245 insertions(+), 13 deletions(-) diff --git a/source/logrepl/cdc_test.go b/source/logrepl/cdc_test.go index 2df292f..4543513 100644 --- a/source/logrepl/cdc_test.go +++ b/source/logrepl/cdc_test.go @@ -18,6 +18,9 @@ import ( "context" "errors" "fmt" + "github.com/conduitio/conduit-commons/schema" + sdkschema "github.com/conduitio/conduit-connector-sdk/schema" + "github.com/hamba/avro/v2" "strings" "testing" "time" @@ -517,3 +520,130 @@ func fetchSlotStats(t *testing.T, c test.Querier, slotName string) (pglogrepl.LS time.Sleep(100 * time.Millisecond) } } + +func TestCDCIterator_Schema(t *testing.T) { + ctx := context.Background() + + pool := test.ConnectPool(ctx, t, test.RepmgrConnString) + table := test.SetupTestTable(ctx, t, pool) + + i := testCDCIterator(ctx, t, pool, table, true) + <-i.sub.Ready() + + t.Run("initial table schema", func(t *testing.T) { + is := is.New(t) + + _, err := pool.Exec( + ctx, + fmt.Sprintf(`INSERT INTO %s (id, column1, column2, column3, column4, column5) + VALUES (6, 'bizz', 456, false, 12.3, 14)`, table), + ) + is.NoErr(err) + + r, err := i.Next(ctx) + is.NoErr(err) + + assertPayloadSchemaOK(ctx, is, test.TestTableAvroSchemaV1, table, r) + assertKeySchemaOK(ctx, is, table, r) + }) + + t.Run("column added", func(t *testing.T) { + is := is.New(t) + + _, err := pool.Exec(ctx, fmt.Sprintf(`ALTER TABLE %s ADD COLUMN column6 timestamp;`, table)) + + _, err = pool.Exec( + ctx, + fmt.Sprintf(`INSERT INTO %s (id, key, column1, column2, column3, column4, column5, column6) + VALUES (7, decode('aabbcc', 'hex'), 'example data 1', 100, true, 12345.678, 12345, '2023-09-09 10:00:00');`, table), + ) + is.NoErr(err) + + r, err := i.Next(ctx) + is.NoErr(err) + + assertPayloadSchemaOK(ctx, is, test.TestTableAvroSchemaV2, table, r) + assertKeySchemaOK(ctx, is, table, r) + }) + + t.Run("column removed", func(t *testing.T) { + is := is.New(t) + + _, err := pool.Exec(ctx, fmt.Sprintf(`ALTER TABLE %s DROP COLUMN column4, DROP COLUMN column5;`, table)) + + _, err = pool.Exec( + ctx, + fmt.Sprintf(`INSERT INTO %s (id, key, column1, column2, column3, column6) + VALUES (8, decode('aabbcc', 'hex'), 'example data 1', 100, true, '2023-09-09 10:00:00');`, table), + ) + is.NoErr(err) + + r, err := i.Next(ctx) + is.NoErr(err) + + assertPayloadSchemaOK(ctx, is, test.TestTableAvroSchemaV3, table, r) + assertKeySchemaOK(ctx, is, table, r) + }) +} + +func assertPayloadSchemaOK(ctx context.Context, is *is.I, wantSchemaTemplate string, table string, r opencdc.Record) { + gotConduitSch, err := getPayloadSchema(ctx, r) + + want, err := avro.Parse(fmt.Sprintf(wantSchemaTemplate, table+"_payload")) + is.NoErr(err) + + got, err := avro.ParseBytes(gotConduitSch.Bytes) + is.NoErr(err) + + is.Equal(want.String(), got.String()) +} + +func assertKeySchemaOK(ctx context.Context, is *is.I, table string, r opencdc.Record) { + gotConduitSch, err := getKeySchema(ctx, r) + + want, err := avro.Parse(fmt.Sprintf(test.TestTableKeyAvroSchema, table+"_key")) + is.NoErr(err) + + got, err := avro.ParseBytes(gotConduitSch.Bytes) + is.NoErr(err) + + is.Equal(want.String(), got.String()) +} + +func getPayloadSchema(ctx context.Context, r opencdc.Record) (schema.Schema, error) { + payloadSubj, err := r.Metadata.GetPayloadSchemaSubject() + if err != nil { + return schema.Schema{}, fmt.Errorf("GetPayloadSchemaSubject failed: %w", err) + } + + payloadV, err := r.Metadata.GetPayloadSchemaVersion() + if err != nil { + return schema.Schema{}, fmt.Errorf("GetPayloadSchemaVersion failed: %w", err) + } + + payloadSch, err := sdkschema.Get(ctx, payloadSubj, payloadV) + if err != nil { + return schema.Schema{}, fmt.Errorf("failed getting schema: %w", err) + } + + return payloadSch, nil +} + +func getKeySchema(ctx context.Context, r opencdc.Record) (schema.Schema, error) { + keySubj, err := r.Metadata.GetKeySchemaSubject() + if err != nil { + return schema.Schema{}, fmt.Errorf("GetKeySchemaSubject failed: %w", err) + } + + keyV, err := r.Metadata.GetKeySchemaVersion() + if err != nil { + return schema.Schema{}, fmt.Errorf("GetKeySchemaVersion failed: %w", err) + } + + keySch, err := sdkschema.Get(ctx, keySubj, keyV) + if err != nil { + return schema.Schema{}, fmt.Errorf("failed getting schema: %w", err) + } + + return keySch, nil +} diff --git a/source/schema/avro.go b/source/schema/avro.go index 58632b1..f5a7045 100644 --- a/source/schema/avro.go +++ b/source/schema/avro.go @@ -26,7 +26,7 @@ import ( ) const ( - avroNS = "conduit.postgres" + avroNS = "" avroDecimalPadding = 8 ) @@ -132,7 +132,7 @@ func (a *avroExtractor) extractType(t *pgtype.Type, typeMod int32) (avro.Schema, scale := int((typeMod - 4) & 65535) precision := int(((typeMod - 4) >> 16) & 65535) fs, err := avro.NewFixedSchema( - string(avro.Decimal), + fmt.Sprintf("%s_%d_%d", avro.Decimal, precision, scale), avroNS, precision+scale+avroDecimalPadding, avro.NewDecimalLogicalSchema(precision, scale), diff --git a/test/helper.go b/test/helper.go index b36b77d..4a52f83 100644 --- a/test/helper.go +++ b/test/helper.go @@ -36,6 +36,118 @@ const RepmgrConnString = "postgres://repmgr:repmgrmeroxa@127.0.0.1:5433/meroxadb // RegularConnString is a non-replication user connection string for the test postgres. const RegularConnString = "postgres://meroxauser:meroxapass@127.0.0.1:5433/meroxadb?sslmode=disable" +// TestTableAvroSchemaV1 is the Avro schema representation of the test table +// defined through testTableCreateQuery. +// The fields are sorted by name. +const TestTableAvroSchemaV1 = `{ + "type": "record", + "name": "%s", + "fields": + [ + {"name":"column1","type":"string"}, + {"name":"column2","type":"int"}, + {"name":"column3","type":"boolean"}, + { + "name": "column4", + "type": + { + "name": "decimal_16_3", + "type": "fixed", + "size": 27, + "logicalType": "decimal", + "precision": 16, + "scale": 3 + } + }, + { + "name": "column5", + "type": + { + "name": "decimal_5_0", + "type": "fixed", + "size": 13, + "logicalType": "decimal", + "precision": 5 + } + }, + {"name":"id","type":"long"}, + {"name":"key","type":"bytes"} + ] +}` + +const TestTableAvroSchemaV2 = `{ + "type": "record", + "name": "%s", + "fields": + [ + {"name":"column1","type":"string"}, + {"name":"column2","type":"int"}, + {"name":"column3","type":"boolean"}, + { + "name": "column4", + "type": + { + "name": "decimal_16_3", + "type": "fixed", + "size": 27, + "logicalType": "decimal", + "precision": 16, + "scale": 3 + } + }, + { + "name": "column5", + "type": + { + "name": "decimal_5_0", + "type": "fixed", + "size": 13, + "logicalType": "decimal", + "precision": 5 + } + }, + {"name":"column6","type":{"type":"long","logicalType":"local-timestamp-micros"}}, + {"name":"id","type":"long"}, + {"name":"key","type":"bytes"} + ] +}` + +// TestTableAvroSchemaV3 is TestTableAvroSchemaV1 with `column4` and `column5` dropped +const TestTableAvroSchemaV3 = `{ + "type": "record", + "name": "%s", + "fields": + [ + {"name":"column1","type":"string"}, + {"name":"column2","type":"int"}, + {"name":"column3","type":"boolean"}, + {"name":"column6","type":{"type":"long","logicalType":"local-timestamp-micros"}}, + {"name":"id","type":"long"}, + {"name":"key","type":"bytes"} + ] +}` + +const TestTableKeyAvroSchema = `{ + "type": "record", + "name": "%s", + "fields": + [ + {"name":"id","type":"long"} + ] +}` + +// When updating this table, TestTableAvroSchemaV1 needs to be updated too. +const testTableCreateQuery = ` + CREATE TABLE %s ( + id bigserial PRIMARY KEY, + key bytea, + column1 varchar(256), + column2 integer, + column3 boolean, + column4 numeric(16,3), + column5 numeric(5) + )` + type Querier interface { Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error) Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error) @@ -71,17 +183,7 @@ func SetupTestTable(ctx context.Context, t *testing.T, conn Querier) string { table := RandomIdentifier(t) - query := ` - CREATE TABLE %s ( - id bigserial PRIMARY KEY, - key bytea, - column1 varchar(256), - column2 integer, - column3 boolean, - column4 numeric(16,3), - column5 numeric(5) - )` - query = fmt.Sprintf(query, table) + query := fmt.Sprintf(testTableCreateQuery, table) _, err := conn.Exec(ctx, query) is.NoErr(err) From 14609cf1198aeae0453e19fa0958eb9fdfda5048 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Fri, 13 Sep 2024 15:07:54 +0200 Subject: [PATCH 11/14] linter --- source/logrepl/cdc_test.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/source/logrepl/cdc_test.go b/source/logrepl/cdc_test.go index 4543513..1b0a886 100644 --- a/source/logrepl/cdc_test.go +++ b/source/logrepl/cdc_test.go @@ -18,18 +18,18 @@ import ( "context" "errors" "fmt" - "github.com/conduitio/conduit-commons/schema" - sdkschema "github.com/conduitio/conduit-connector-sdk/schema" - "github.com/hamba/avro/v2" "strings" "testing" "time" "github.com/conduitio/conduit-commons/opencdc" + "github.com/conduitio/conduit-commons/schema" "github.com/conduitio/conduit-connector-postgres/source/position" "github.com/conduitio/conduit-connector-postgres/test" + sdkschema "github.com/conduitio/conduit-connector-sdk/schema" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "github.com/hamba/avro/v2" "github.com/jackc/pglogrepl" "github.com/jackc/pgx/v5/pgxpool" "github.com/matryer/is" @@ -551,6 +551,7 @@ func TestCDCIterator_Schema(t *testing.T) { is := is.New(t) _, err := pool.Exec(ctx, fmt.Sprintf(`ALTER TABLE %s ADD COLUMN column6 timestamp;`, table)) + is.NoErr(err) _, err = pool.Exec( ctx, @@ -570,6 +571,7 @@ func TestCDCIterator_Schema(t *testing.T) { is := is.New(t) _, err := pool.Exec(ctx, fmt.Sprintf(`ALTER TABLE %s DROP COLUMN column4, DROP COLUMN column5;`, table)) + is.NoErr(err) _, err = pool.Exec( ctx, @@ -588,6 +590,7 @@ func TestCDCIterator_Schema(t *testing.T) { func assertPayloadSchemaOK(ctx context.Context, is *is.I, wantSchemaTemplate string, table string, r opencdc.Record) { gotConduitSch, err := getPayloadSchema(ctx, r) + is.NoErr(err) want, err := avro.Parse(fmt.Sprintf(wantSchemaTemplate, table+"_payload")) is.NoErr(err) @@ -600,6 +603,7 @@ func assertPayloadSchemaOK(ctx context.Context, is *is.I, wantSchemaTemplate str func assertKeySchemaOK(ctx context.Context, is *is.I, table string, r opencdc.Record) { gotConduitSch, err := getKeySchema(ctx, r) + is.NoErr(err) want, err := avro.Parse(fmt.Sprintf(test.TestTableKeyAvroSchema, table+"_key")) is.NoErr(err) From 0cecc0df0ea7e7cb7dd05f8e3829ca4db93140b7 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Fri, 13 Sep 2024 17:40:43 +0200 Subject: [PATCH 12/14] fix test --- source/schema/avro_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/schema/avro_test.go b/source/schema/avro_test.go index c5d2176..60ca4f7 100644 --- a/source/schema/avro_test.go +++ b/source/schema/avro_test.go @@ -189,7 +189,7 @@ func avroTestSchema(t *testing.T, table string) avro.Schema { assert(avro.NewField("col_int8", avro.NewPrimitiveSchema(avro.Long, nil))), assert(avro.NewField("col_text", avro.NewPrimitiveSchema(avro.String, nil))), assert(avro.NewField("col_numeric", - assert(avro.NewFixedSchema(string(avro.Decimal), + assert(avro.NewFixedSchema(fmt.Sprintf("%s_%d_%d", avro.Decimal, 8, 2), avroNS, 18, avro.NewDecimalLogicalSchema(8, 2), From 70a6bded84531669234d0988085dabdc7df3ec65 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Mon, 16 Sep 2024 13:58:12 +0200 Subject: [PATCH 13/14] comments --- source/schema/avro.go | 2 ++ source/snapshot/fetch_worker.go | 6 ++++++ test/helper.go | 6 ++++-- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/source/schema/avro.go b/source/schema/avro.go index f5a7045..9a3c8e6 100644 --- a/source/schema/avro.go +++ b/source/schema/avro.go @@ -132,6 +132,8 @@ func (a *avroExtractor) extractType(t *pgtype.Type, typeMod int32) (avro.Schema, scale := int((typeMod - 4) & 65535) precision := int(((typeMod - 4) >> 16) & 65535) fs, err := avro.NewFixedSchema( + // It's not possible to have multiple schemas with different properties + // and the same name. fmt.Sprintf("%s_%d_%d", avro.Decimal, precision, scale), avroNS, precision+scale+avroDecimalPadding, diff --git a/source/snapshot/fetch_worker.go b/source/snapshot/fetch_worker.go index fc97e98..18cb904 100644 --- a/source/snapshot/fetch_worker.go +++ b/source/snapshot/fetch_worker.go @@ -470,6 +470,9 @@ func (*FetchWorker) validateTable(ctx context.Context, table string, tx pgx.Tx) func (f *FetchWorker) extractSchemas(ctx context.Context, fields []pgconn.FieldDescription) error { if f.payloadSchema == nil { + sdk.Logger(ctx).Debug(). + Msgf("extracting payload schema for %v fields in %v", len(fields), f.conf.Table) + avroPayloadSch, err := schema.Avro.Extract(f.conf.Table+"_payload", fields) if err != nil { return fmt.Errorf("failed to extract payload schema for table %v: %w", f.conf.Table, err) @@ -487,6 +490,9 @@ func (f *FetchWorker) extractSchemas(ctx context.Context, fields []pgconn.FieldD } if f.keySchema == nil { + sdk.Logger(ctx).Debug(). + Msgf("extracting schema for key %v in %v", f.conf.Key, f.conf.Table) + avroKeySch, err := schema.Avro.Extract(f.conf.Table+"_key", fields, f.conf.Key) if err != nil { return fmt.Errorf("failed to extract key schema for table %v: %w", f.conf.Table, err) diff --git a/test/helper.go b/test/helper.go index 4a52f83..9f7882c 100644 --- a/test/helper.go +++ b/test/helper.go @@ -75,6 +75,7 @@ const TestTableAvroSchemaV1 = `{ ] }` +// TestTableAvroSchemaV2 is TestTableAvroSchemaV1 with `column6` (local-timestamp-micros) added. const TestTableAvroSchemaV2 = `{ "type": "record", "name": "%s", @@ -106,13 +107,13 @@ const TestTableAvroSchemaV2 = `{ "precision": 5 } }, - {"name":"column6","type":{"type":"long","logicalType":"local-timestamp-micros"}}, + {"name":"column6","type":{"type":"long","logicalType":"local-timestamp-micros"}}, {"name":"id","type":"long"}, {"name":"key","type":"bytes"} ] }` -// TestTableAvroSchemaV3 is TestTableAvroSchemaV1 with `column4` and `column5` dropped +// TestTableAvroSchemaV3 is TestTableAvroSchemaV1 with `column4` and `column5` dropped. const TestTableAvroSchemaV3 = `{ "type": "record", "name": "%s", @@ -127,6 +128,7 @@ const TestTableAvroSchemaV3 = `{ ] }` +// TestTableKeyAvroSchema is the Avro schema for the test table's key column. const TestTableKeyAvroSchema = `{ "type": "record", "name": "%s", From 83b969b1a5faaa0afc6b1038b389de0f96da4f0f Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Mon, 16 Sep 2024 18:25:58 +0200 Subject: [PATCH 14/14] remove avroNS --- source/schema/avro.go | 5 ++--- source/schema/avro_test.go | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/source/schema/avro.go b/source/schema/avro.go index 9a3c8e6..36776be 100644 --- a/source/schema/avro.go +++ b/source/schema/avro.go @@ -26,7 +26,6 @@ import ( ) const ( - avroNS = "" avroDecimalPadding = 8 ) @@ -114,7 +113,7 @@ func (a *avroExtractor) Extract(schemaName string, fields []pgconn.FieldDescript return cmp.Compare(a.Name(), b.Name()) }) - sch, err := avro.NewRecordSchema(schemaName, avroNS, avroFields) + sch, err := avro.NewRecordSchema(schemaName, "", avroFields) if err != nil { return nil, fmt.Errorf("failed to create avro schema: %w", err) } @@ -135,7 +134,7 @@ func (a *avroExtractor) extractType(t *pgtype.Type, typeMod int32) (avro.Schema, // It's not possible to have multiple schemas with different properties // and the same name. fmt.Sprintf("%s_%d_%d", avro.Decimal, precision, scale), - avroNS, + "", precision+scale+avroDecimalPadding, avro.NewDecimalLogicalSchema(precision, scale), ) diff --git a/source/schema/avro_test.go b/source/schema/avro_test.go index 60ca4f7..9b49847 100644 --- a/source/schema/avro_test.go +++ b/source/schema/avro_test.go @@ -190,7 +190,7 @@ func avroTestSchema(t *testing.T, table string) avro.Schema { assert(avro.NewField("col_text", avro.NewPrimitiveSchema(avro.String, nil))), assert(avro.NewField("col_numeric", assert(avro.NewFixedSchema(fmt.Sprintf("%s_%d_%d", avro.Decimal, 8, 2), - avroNS, + "", 18, avro.NewDecimalLogicalSchema(8, 2), )))), @@ -216,7 +216,7 @@ func avroTestSchema(t *testing.T, table string) avro.Schema { return cmp.Compare(a.Name(), b.Name()) }) - s, err := avro.NewRecordSchema(table, avroNS, fields) + s, err := avro.NewRecordSchema(table, "", fields) is.NoErr(err) return s