From 2779568459c7c1e36409089f0e49fe4b3c573c24 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 30 Jun 2023 17:37:03 +0800 Subject: [PATCH] changefeed (ticdc): remove status of a changefeed after it is removed (#9174) (#9184) close pingcap/tiflow#9173, ref pingcap/tiflow#9177 --- cdc/owner/changefeed.go | 4 ++++ cdc/processor/processor.go | 5 ++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 78e8de13d0c..35244195bbe 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -805,6 +805,10 @@ func (c *changefeed) cleanupMetrics() { changefeedBarrierTsGauge.DeleteLabelValues(c.id.Namespace, c.id.ID) c.metricsChangefeedBarrierTsGauge = nil + + if c.isRemoved { + changefeedStatusGauge.DeleteLabelValues(c.id.Namespace, c.id.ID) + } } // cleanup redo logs if changefeed is removed and redo log is enabled diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 1ec2a6f8143..c3af163bccf 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -912,6 +912,10 @@ func (p *processor) Close() error { zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID)) + // clean up metrics first to avoid some metrics are not cleaned up + // when error occurs during closing the processor + p.cleanupMetrics() + p.sinkManager.stop(p.changefeedID) p.sinkManager.r = nil p.sourceManager.stop(p.changefeedID) @@ -949,7 +953,6 @@ func (p *processor) Close() error { // mark tables share the same cdcContext with its original table, don't need to cancel failpoint.Inject("processorStopDelay", nil) - p.cleanupMetrics() log.Info("processor closed", zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID))