diff --git a/lib/artie/message.go b/lib/artie/message.go index 8d6757cc1..391566231 100644 --- a/lib/artie/message.go +++ b/lib/artie/message.go @@ -56,6 +56,21 @@ func (m *Message) Kind() Kind { return Invalid } +// EmitRowLag will diff against the partition's high watermark and the message's offset +// This function is only available for Kafka since Kafka has the concept of offsets and watermarks. +func (m *Message) EmitRowLag(ctx context.Context, groupID, table string) { + if m.KafkaMsg == nil { + return + } + + metrics.FromContext(ctx).GaugeWithSample("row.lag", float64(m.KafkaMsg.HighWaterMark-m.KafkaMsg.Offset), map[string]string{ + "groupID": groupID, + "topic": m.Topic(), + "table": table, + "partition": m.Partition(), + }, 0.5) +} + func (m *Message) EmitIngestionLag(ctx context.Context, groupID, table string) { metrics.FromContext(ctx).Timing("ingestion.lag", time.Since(m.PublishTime()), map[string]string{ "groupID": groupID, diff --git a/lib/telemetry/metrics/base/provider.go b/lib/telemetry/metrics/base/provider.go index 92fe56d17..4759fa777 100644 --- a/lib/telemetry/metrics/base/provider.go +++ b/lib/telemetry/metrics/base/provider.go @@ -7,4 +7,5 @@ type Client interface { Incr(name string, tags map[string]string) Count(name string, value int64, tags map[string]string) Gauge(name string, value float64, tags map[string]string) + GaugeWithSample(name string, value float64, tags map[string]string, sample float64) } diff --git a/lib/telemetry/metrics/datadog/datadog.go b/lib/telemetry/metrics/datadog/datadog.go index 994219f44..fa8353606 100644 --- a/lib/telemetry/metrics/datadog/datadog.go +++ b/lib/telemetry/metrics/datadog/datadog.go @@ -89,3 +89,7 @@ func (s *statsClient) Count(name string, value int64, tags map[string]string) { func (s *statsClient) Gauge(name string, value float64, tags map[string]string) { _ = s.client.Gauge(name, value, toDatadogTags(tags), s.rate) } + +func (s *statsClient) GaugeWithSample(name string, value float64, tags map[string]string, sample float64) { + _ = s.client.Gauge(name, value, toDatadogTags(tags), sample) +} diff --git a/lib/telemetry/metrics/null_provider.go b/lib/telemetry/metrics/null_provider.go index db65bc9c3..5ef3364db 100644 --- a/lib/telemetry/metrics/null_provider.go +++ b/lib/telemetry/metrics/null_provider.go @@ -8,6 +8,10 @@ func (n NullMetricsProvider) Gauge(name string, value float64, tags map[string]s return } +func (n NullMetricsProvider) GaugeWithSample(name string, value float64, tags map[string]string, sample float64) { + return +} + func (n NullMetricsProvider) Count(name string, value int64, tags map[string]string) { return } diff --git a/processes/consumer/kafka.go b/processes/consumer/kafka.go index a2619e835..362e44b41 100644 --- a/processes/consumer/kafka.go +++ b/processes/consumer/kafka.go @@ -136,6 +136,7 @@ func StartConsumer(ctx context.Context) { }) msg.EmitIngestionLag(ctx, kafkaConsumer.Config().GroupID, tableName) + msg.EmitRowLag(ctx, kafkaConsumer.Config().GroupID, tableName) if processErr != nil { log.WithError(processErr).WithFields(logFields).Warn("skipping message...") }