Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor code clean up #399

Merged
merged 1 commit into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions processes/consumer/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,10 @@ type TopicConfigFormatter struct {
}

func commitOffset(ctx context.Context, topic string, partitionsToOffset map[string][]artie.Message) error {
var err error
for _, msgs := range partitionsToOffset {
for _, msg := range msgs {
if msg.KafkaMsg != nil {
err = topicToConsumer.Get(topic).CommitMessages(ctx, *msg.KafkaMsg)
if err != nil {
if err := topicToConsumer.Get(topic).CommitMessages(ctx, *msg.KafkaMsg); err != nil {
return err
}
}
Expand All @@ -55,5 +53,5 @@ func commitOffset(ctx context.Context, topic string, partitionsToOffset map[stri
}
}

return err
return nil
}
5 changes: 3 additions & 2 deletions processes/consumer/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,13 @@ func StartConsumer(ctx context.Context, cfg config.Config, inMemDB *models.Datab
}

msg := artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic)
tableName, processErr := ProcessMessage(ctx, cfg, inMemDB, dest, metricsClient, ProcessArgs{
args := processArgs{
Msg: msg,
GroupID: kafkaConsumer.Config().GroupID,
TopicToConfigFormatMap: tcFmtMap,
})
}

tableName, processErr := args.process(ctx, cfg, inMemDB, dest, metricsClient)
msg.EmitIngestionLag(metricsClient, cfg.Mode, kafkaConsumer.Config().GroupID, tableName)
msg.EmitRowLag(metricsClient, cfg.Mode, kafkaConsumer.Config().GroupID, tableName)
if processErr != nil {
Expand Down
26 changes: 11 additions & 15 deletions processes/consumer/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,21 @@ import (
"github.com/artie-labs/transfer/models/event"
)

type ProcessArgs struct {
type processArgs struct {
Msg artie.Message
GroupID string
TopicToConfigFormatMap *TcFmtMap
}

// ProcessMessage will return:
// 1. TableName (string)
// 2. Error
// We are using the TableName for emitting Kafka ingestion lag
func ProcessMessage(ctx context.Context, cfg config.Config, inMemDB *models.DatabaseData, dest destination.Baseline, metricsClient base.Client, processArgs ProcessArgs) (string, error) {
if processArgs.TopicToConfigFormatMap == nil {
func (p processArgs) process(ctx context.Context, cfg config.Config, inMemDB *models.DatabaseData, dest destination.Baseline, metricsClient base.Client) (string, error) {
if p.TopicToConfigFormatMap == nil {
return "", fmt.Errorf("failed to process, topicConfig is nil")
}

tags := map[string]string{
"mode": cfg.Mode.String(),
"groupID": processArgs.GroupID,
"topic": processArgs.Msg.Topic(),
"groupID": p.GroupID,
"topic": p.Msg.Topic(),
"what": "success",
}

Expand All @@ -41,23 +37,23 @@ func ProcessMessage(ctx context.Context, cfg config.Config, inMemDB *models.Data
metricsClient.Timing("process.message", time.Since(st), tags)
}()

topicConfig, isOk := processArgs.TopicToConfigFormatMap.GetTopicFmt(processArgs.Msg.Topic())
topicConfig, isOk := p.TopicToConfigFormatMap.GetTopicFmt(p.Msg.Topic())
if !isOk {
tags["what"] = "failed_topic_lookup"
return "", fmt.Errorf("failed to get topic name: %s", processArgs.Msg.Topic())
return "", fmt.Errorf("failed to get topic name: %s", p.Msg.Topic())
}

tags["database"] = topicConfig.tc.Database
tags["schema"] = topicConfig.tc.Schema

pkMap, err := topicConfig.GetPrimaryKey(processArgs.Msg.Key(), topicConfig.tc)
pkMap, err := topicConfig.GetPrimaryKey(p.Msg.Key(), topicConfig.tc)
if err != nil {
tags["what"] = "marshall_pk_err"
return "", fmt.Errorf("cannot unmarshall key %s: %w", string(processArgs.Msg.Key()), err)
return "", fmt.Errorf("cannot unmarshall key %s: %w", string(p.Msg.Key()), err)
}

typingSettings := cfg.SharedTransferConfig.TypingSettings
_event, err := topicConfig.GetEventFromBytes(typingSettings, processArgs.Msg.Value())
_event, err := topicConfig.GetEventFromBytes(typingSettings, p.Msg.Value())
if err != nil {
tags["what"] = "marshall_value_err"
return "", fmt.Errorf("cannot unmarshall event: %w", err)
Expand All @@ -75,7 +71,7 @@ func ProcessMessage(ctx context.Context, cfg config.Config, inMemDB *models.Data
return evt.Table, nil
}

shouldFlush, flushReason, err := evt.Save(cfg, inMemDB, topicConfig.tc, processArgs.Msg)
shouldFlush, flushReason, err := evt.Save(cfg, inMemDB, topicConfig.tc, p.Msg)
if err != nil {
tags["what"] = "save_fail"
return "", fmt.Errorf("event failed to save: %w", err)
Expand Down
24 changes: 12 additions & 12 deletions processes/consumer/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,17 @@ func TestProcessMessageFailures(t *testing.T) {
}

msg := artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic)
processArgs := ProcessArgs{
args := processArgs{
Msg: msg,
GroupID: "foo",
}

tableName, err := ProcessMessage(ctx, cfg, memDB, MockDestination{}, metrics.NullMetricsProvider{}, processArgs)
tableName, err := args.process(ctx, cfg, memDB, MockDestination{}, metrics.NullMetricsProvider{})
assert.ErrorContains(t, err, "failed to process, topicConfig is nil", err.Error())
assert.Empty(t, tableName)

processArgs.TopicToConfigFormatMap = NewTcFmtMap()
tableName, err = ProcessMessage(ctx, cfg, memDB, MockDestination{}, metrics.NullMetricsProvider{}, processArgs)
args.TopicToConfigFormatMap = NewTcFmtMap()
tableName, err = args.process(ctx, cfg, memDB, MockDestination{}, metrics.NullMetricsProvider{})
assert.ErrorContains(t, err, "failed to get topic", err.Error())
assert.Equal(t, 0, len(memDB.TableData()))
assert.Empty(t, tableName)
Expand All @@ -92,7 +92,7 @@ func TestProcessMessageFailures(t *testing.T) {
Format: &mgo,
})

processArgs = ProcessArgs{
args = processArgs{
Msg: msg,
GroupID: "foo",
TopicToConfigFormatMap: tcFmtMap,
Expand All @@ -101,7 +101,7 @@ func TestProcessMessageFailures(t *testing.T) {
tcFmt, isOk := tcFmtMap.GetTopicFmt(msg.Topic())
assert.True(t, isOk)

tableName, err = ProcessMessage(ctx, cfg, memDB, MockDestination{}, metrics.NullMetricsProvider{}, processArgs)
tableName, err = args.process(ctx, cfg, memDB, MockDestination{}, metrics.NullMetricsProvider{})
assert.ErrorContains(t, err, fmt.Sprintf("format: %s is not supported", tcFmt.tc.CDCKeyFormat), err.Error())
assert.ErrorContains(t, err, "cannot unmarshall key", err.Error())
assert.Equal(t, 0, len(memDB.TableData()))
Expand Down Expand Up @@ -189,13 +189,13 @@ func TestProcessMessageFailures(t *testing.T) {
msg.KafkaMsg.Value = []byte(val)
}

processArgs = ProcessArgs{
args = processArgs{
Msg: msg,
GroupID: "foo",
TopicToConfigFormatMap: tcFmtMap,
}

tableName, err = ProcessMessage(ctx, cfg, memDB, MockDestination{}, metrics.NullMetricsProvider{}, processArgs)
tableName, err = args.process(ctx, cfg, memDB, MockDestination{}, metrics.NullMetricsProvider{})
assert.NoError(t, err)
assert.Equal(t, table, tableName)

Expand All @@ -218,13 +218,13 @@ func TestProcessMessageFailures(t *testing.T) {
assert.False(t, val.(bool))

msg.KafkaMsg.Value = []byte("not a json object")
processArgs = ProcessArgs{
args = processArgs{
Msg: msg,
GroupID: "foo",
TopicToConfigFormatMap: tcFmtMap,
}

tableName, err = ProcessMessage(ctx, cfg, memDB, MockDestination{}, metrics.NullMetricsProvider{}, processArgs)
tableName, err = args.process(ctx, cfg, memDB, MockDestination{}, metrics.NullMetricsProvider{})
assert.Error(t, err)
assert.Empty(t, tableName)
assert.True(t, td.NumberOfRows() > 0)
Expand Down Expand Up @@ -355,7 +355,7 @@ func TestProcessMessageSkip(t *testing.T) {
msg.KafkaMsg.Value = []byte(val)
}

processArgs := ProcessArgs{
args := processArgs{
Msg: msg,
GroupID: "foo",
TopicToConfigFormatMap: tcFmtMap,
Expand All @@ -364,7 +364,7 @@ func TestProcessMessageSkip(t *testing.T) {
td := memoryDB.GetOrCreateTableData(table)
assert.Equal(t, 0, int(td.NumberOfRows()))

tableName, err := ProcessMessage(ctx, cfg, memDB, MockDestination{}, metrics.NullMetricsProvider{}, processArgs)
tableName, err := args.process(ctx, cfg, memDB, MockDestination{}, metrics.NullMetricsProvider{})
assert.NoError(t, err)
assert.Equal(t, table, tableName)
// Because it got skipped.
Expand Down
5 changes: 3 additions & 2 deletions processes/consumer/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,13 @@ func StartSubscriber(ctx context.Context, cfg config.Config, inMemDB *models.Dat
slog.String("value", string(msg.Value())),
}

tableName, processErr := ProcessMessage(ctx, cfg, inMemDB, dest, metricsClient, ProcessArgs{
args := processArgs{
Msg: msg,
GroupID: subName,
TopicToConfigFormatMap: tcFmtMap,
})
}

tableName, processErr := args.process(ctx, cfg, inMemDB, dest, metricsClient)
msg.EmitIngestionLag(metricsClient, cfg.Mode, subName, tableName)
if processErr != nil {
slog.With(logFields...).Warn("Skipping message...", slog.Any("err", processErr))
Expand Down