From 68bc5dd932b4e8f7284520e9f4da4da169961bd1 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Fri, 1 Dec 2023 10:18:04 -0800 Subject: [PATCH 1/3] Adding Row Lag. --- lib/artie/message.go | 14 ++++++++++++++ processes/consumer/kafka.go | 1 + 2 files changed, 15 insertions(+) diff --git a/lib/artie/message.go b/lib/artie/message.go index 8d6757cc1..a1f856413 100644 --- a/lib/artie/message.go +++ b/lib/artie/message.go @@ -56,6 +56,20 @@ func (m *Message) Kind() Kind { return Invalid } +// EmitRowLag will diff against the partition's high watermark and the message's offset +func (m *Message) EmitRowLag(ctx context.Context, groupID, table string) { + if m.KafkaMsg == nil { + return + } + + metrics.FromContext(ctx).Gauge("row.lag", float64(m.KafkaMsg.HighWaterMark-m.KafkaMsg.Offset), map[string]string{ + "groupID": groupID, + "topic": m.Topic(), + "table": table, + "partition": m.Partition(), + }) +} + func (m *Message) EmitIngestionLag(ctx context.Context, groupID, table string) { metrics.FromContext(ctx).Timing("ingestion.lag", time.Since(m.PublishTime()), map[string]string{ "groupID": groupID, diff --git a/processes/consumer/kafka.go b/processes/consumer/kafka.go index a2619e835..362e44b41 100644 --- a/processes/consumer/kafka.go +++ b/processes/consumer/kafka.go @@ -136,6 +136,7 @@ func StartConsumer(ctx context.Context) { }) msg.EmitIngestionLag(ctx, kafkaConsumer.Config().GroupID, tableName) + msg.EmitRowLag(ctx, kafkaConsumer.Config().GroupID, tableName) if processErr != nil { log.WithError(processErr).WithFields(logFields).Warn("skipping message...") } From 5519a5a57efc61c8743c77c6263c13336daa9e98 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Fri, 1 Dec 2023 10:23:37 -0800 Subject: [PATCH 2/3] Checkpoint. --- lib/artie/message.go | 4 ++-- lib/telemetry/metrics/base/provider.go | 1 + lib/telemetry/metrics/datadog/datadog.go | 4 ++++ lib/telemetry/metrics/null_provider.go | 4 ++++ 4 files changed, 11 insertions(+), 2 deletions(-) diff --git a/lib/artie/message.go b/lib/artie/message.go index a1f856413..3e155b817 100644 --- a/lib/artie/message.go +++ b/lib/artie/message.go @@ -62,12 +62,12 @@ func (m *Message) EmitRowLag(ctx context.Context, groupID, table string) { return } - metrics.FromContext(ctx).Gauge("row.lag", float64(m.KafkaMsg.HighWaterMark-m.KafkaMsg.Offset), map[string]string{ + metrics.FromContext(ctx).GaugeWithSample("row.lag", float64(m.KafkaMsg.HighWaterMark-m.KafkaMsg.Offset), map[string]string{ "groupID": groupID, "topic": m.Topic(), "table": table, "partition": m.Partition(), - }) + }, 0.5) } func (m *Message) EmitIngestionLag(ctx context.Context, groupID, table string) { diff --git a/lib/telemetry/metrics/base/provider.go b/lib/telemetry/metrics/base/provider.go index 92fe56d17..4759fa777 100644 --- a/lib/telemetry/metrics/base/provider.go +++ b/lib/telemetry/metrics/base/provider.go @@ -7,4 +7,5 @@ type Client interface { Incr(name string, tags map[string]string) Count(name string, value int64, tags map[string]string) Gauge(name string, value float64, tags map[string]string) + GaugeWithSample(name string, value float64, tags map[string]string, sample float64) } diff --git a/lib/telemetry/metrics/datadog/datadog.go b/lib/telemetry/metrics/datadog/datadog.go index 994219f44..fa8353606 100644 --- a/lib/telemetry/metrics/datadog/datadog.go +++ b/lib/telemetry/metrics/datadog/datadog.go @@ -89,3 +89,7 @@ func (s *statsClient) Count(name string, value int64, tags map[string]string) { func (s *statsClient) Gauge(name string, value float64, tags map[string]string) { _ = s.client.Gauge(name, value, toDatadogTags(tags), s.rate) } + +func (s *statsClient) GaugeWithSample(name string, value float64, tags map[string]string, sample float64) { + _ = s.client.Gauge(name, value, toDatadogTags(tags), sample) +} diff --git a/lib/telemetry/metrics/null_provider.go b/lib/telemetry/metrics/null_provider.go index db65bc9c3..5ef3364db 100644 --- a/lib/telemetry/metrics/null_provider.go +++ b/lib/telemetry/metrics/null_provider.go @@ -8,6 +8,10 @@ func (n NullMetricsProvider) Gauge(name string, value float64, tags map[string]s return } +func (n NullMetricsProvider) GaugeWithSample(name string, value float64, tags map[string]string, sample float64) { + return +} + func (n NullMetricsProvider) Count(name string, value int64, tags map[string]string) { return } From b9dbc2f43cf594918f3e5cf2635f31a73b1163fd Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Fri, 1 Dec 2023 11:14:12 -0800 Subject: [PATCH 3/3] Implementing Row Lag. --- lib/artie/message.go | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/artie/message.go b/lib/artie/message.go index 3e155b817..391566231 100644 --- a/lib/artie/message.go +++ b/lib/artie/message.go @@ -57,6 +57,7 @@ func (m *Message) Kind() Kind { } // EmitRowLag will diff against the partition's high watermark and the message's offset +// This function is only available for Kafka since Kafka has the concept of offsets and watermarks. func (m *Message) EmitRowLag(ctx context.Context, groupID, table string) { if m.KafkaMsg == nil { return