Skip to content

Commit

Permalink
PR Feedback.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Sep 26, 2024
1 parent 3bcc60b commit f5d0e6e
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 5 deletions.
7 changes: 3 additions & 4 deletions models/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,14 @@ func ToMemoryEvent(event cdc.Event, pkMap map[string]any, tc kafkalib.TopicConfi
}

// EmitExecutionTimeLag - This will check against the current time and the event execution time and emit the lag.
func (e *Event) EmitExecutionTimeLag(metricsClient base.Client, mode config.Mode) {
func (e *Event) EmitExecutionTimeLag(metricsClient base.Client) {
metricsClient.GaugeWithSample(
"row.execution_time_lag",
float64(time.Since(e.executionTime).Milliseconds()),
map[string]string{
"mode": mode.String(),
"mode": e.mode.String(),
"table": e.Table,
},
0.5)
}, 0.5)
}

func (e *Event) Validate() error {
Expand Down
2 changes: 1 addition & 1 deletion processes/consumer/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (p processArgs) process(ctx context.Context, cfg config.Config, inMemDB *mo
}

// Emit execution time lag for non-skipped events.
evt.EmitExecutionTimeLag(metricsClient, cfg.Mode)
evt.EmitExecutionTimeLag(metricsClient)
shouldFlush, flushReason, err := evt.Save(cfg, inMemDB, topicConfig.tc, p.Msg)
if err != nil {
tags["what"] = "save_fail"
Expand Down

0 comments on commit f5d0e6e

Please sign in to comment.