Skip to content

Commit

Permalink
Minor clean up.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Sep 27, 2024
1 parent d50ee30 commit 09d8b3b
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 17 deletions.
7 changes: 1 addition & 6 deletions lib/cdc/format/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,8 @@ import (
"github.com/artie-labs/transfer/lib/logger"
)

var (
r relational.Debezium
m mongo.Debezium
)

func GetFormatParser(label, topic string) cdc.Format {
for _, validFormat := range []cdc.Format{&r, &m} {
for _, validFormat := range []cdc.Format{relational.Debezium{}, mongo.Debezium{}} {
for _, fmtLabel := range validFormat.Labels() {
if fmtLabel == label {
slog.Info("Loaded CDC Format parser...",
Expand Down
23 changes: 20 additions & 3 deletions lib/cdc/format/format_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,30 @@ import (

"github.com/stretchr/testify/assert"

"github.com/artie-labs/transfer/lib/cdc/mongo"
"github.com/artie-labs/transfer/lib/cdc/relational"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/typing"
)

func TestGetFormatParser(t *testing.T) {
validFormats := []string{constants.DBZPostgresAltFormat, constants.DBZPostgresFormat, constants.DBZMongoFormat}
for _, validFormat := range validFormats {
assert.NotNil(t, GetFormatParser(validFormat, "topicA"))
{
// Relational
for _, format := range []string{constants.DBZPostgresAltFormat, constants.DBZPostgresFormat} {
formatParser := GetFormatParser(format, "topicA")
assert.NotNil(t, formatParser)

_, err := typing.AssertType[relational.Debezium](formatParser)
assert.NoError(t, err)
}
}
{
// Mongo
formatParser := GetFormatParser(constants.DBZMongoFormat, "topicA")
assert.NotNil(t, formatParser)

_, err := typing.AssertType[mongo.Debezium](formatParser)
assert.NoError(t, err)
}
}

Expand Down
8 changes: 4 additions & 4 deletions lib/cdc/mongo/debezium.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import (
"go.mongodb.org/mongo-driver/bson"
)

type Debezium string
type Debezium struct{}

func (d *Debezium) GetEventFromBytes(bytes []byte) (cdc.Event, error) {
func (Debezium) GetEventFromBytes(bytes []byte) (cdc.Event, error) {
var schemaEventPayload SchemaEventPayload
if len(bytes) == 0 {
return nil, fmt.Errorf("empty message")
Expand Down Expand Up @@ -70,11 +70,11 @@ func (d *Debezium) GetEventFromBytes(bytes []byte) (cdc.Event, error) {
return &schemaEventPayload, nil
}

func (d *Debezium) Labels() []string {
func (Debezium) Labels() []string {
return []string{constants.DBZMongoFormat}
}

func (d *Debezium) GetPrimaryKey(key []byte, tc kafkalib.TopicConfig) (map[string]any, error) {
func (Debezium) GetPrimaryKey(key []byte, tc kafkalib.TopicConfig) (map[string]any, error) {
kvMap, err := debezium.ParsePartitionKey(key, tc.CDCKeyFormat)
if err != nil {
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions lib/cdc/relational/debezium.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
"github.com/artie-labs/transfer/lib/kafkalib"
)

type Debezium string
type Debezium struct{}

func (d *Debezium) GetEventFromBytes(bytes []byte) (cdc.Event, error) {
func (Debezium) GetEventFromBytes(bytes []byte) (cdc.Event, error) {
var event util.SchemaEventPayload
if len(bytes) == 0 {
return nil, fmt.Errorf("empty message")
Expand All @@ -26,7 +26,7 @@ func (d *Debezium) GetEventFromBytes(bytes []byte) (cdc.Event, error) {
return &event, nil
}

func (d *Debezium) Labels() []string {
func (Debezium) Labels() []string {
return []string{
constants.DBZPostgresFormat,
constants.DBZPostgresAltFormat,
Expand All @@ -35,6 +35,6 @@ func (d *Debezium) Labels() []string {
}
}

func (d *Debezium) GetPrimaryKey(key []byte, tc kafkalib.TopicConfig) (map[string]any, error) {
func (Debezium) GetPrimaryKey(key []byte, tc kafkalib.TopicConfig) (map[string]any, error) {
return debezium.ParsePartitionKey(key, tc.CDCKeyFormat)
}

0 comments on commit 09d8b3b

Please sign in to comment.