Skip to content

Commit ac7a3a7

Browse files
committed
add maintainer handle event time metrics and api error handler
1 parent e47554b commit ac7a3a7

File tree

6 files changed

+67
-22
lines changed

6 files changed

+67
-22
lines changed

api/middleware/middleware.go

+24-1
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@ package middleware
1515

1616
import (
1717
"bufio"
18-
"github.com/flowbehappy/tigate/pkg/node"
1918
"net/http"
2019
"strconv"
2120
"time"
2221

22+
"github.com/flowbehappy/tigate/pkg/node"
2323
"github.com/gin-gonic/gin"
2424
"github.com/pingcap/log"
25+
"github.com/pingcap/tiflow/cdc/api"
26+
"github.com/pingcap/tiflow/cdc/model"
2527
"github.com/pingcap/tiflow/pkg/config"
2628
"github.com/pingcap/tiflow/pkg/httputil"
2729
"go.uber.org/zap"
@@ -39,6 +41,27 @@ const (
3941
// ClientVersionHeader is the header name of client version
4042
const ClientVersionHeader = "X-client-version"
4143

44+
// ErrorHandleMiddleware puts the error into response
45+
func ErrorHandleMiddleware() gin.HandlerFunc {
46+
return func(c *gin.Context) {
47+
c.Next()
48+
// because we will return immediately after an error occurs in http_handler
49+
// there wil be only one error in c.Errors
50+
lastError := c.Errors.Last()
51+
if lastError != nil {
52+
err := lastError.Err
53+
// put the error into response
54+
if api.IsHTTPBadRequestError(err) {
55+
c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err))
56+
} else {
57+
c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
58+
}
59+
c.Abort()
60+
return
61+
}
62+
}
63+
}
64+
4265
// LogMiddleware logs the api requests
4366
func LogMiddleware() gin.HandlerFunc {
4467
return func(c *gin.Context) {

api/v2/api.go

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) {
3434
v2 := router.Group("/api/v2")
3535

3636
v2.Use(middleware.LogMiddleware())
37+
v2.Use(middleware.ErrorHandleMiddleware())
3738

3839
v2.GET("status", api.serverStatus)
3940
// For compatibility with the old API,

maintainer/barrier.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ func (b *Barrier) handleNoStateHeartbeat(dispatcherID common.DispatcherID, check
138138
event.advancedDispatchers[dispatcherID] = true
139139
// all dispatcher reported heartbeat, select one to write
140140
if !event.selected && event.allDispatcherReported() {
141-
distacherStatus := &heartbeatpb.DispatcherStatus{
141+
dispatcherStatus := &heartbeatpb.DispatcherStatus{
142142
InfluencedDispatchers: &heartbeatpb.InfluencedDispatchers{
143143
InfluenceType: heartbeatpb.InfluenceType_Normal,
144144
DispatcherIDs: []*heartbeatpb.DispatcherID{dispatcherID.ToPB()},
@@ -149,7 +149,7 @@ func (b *Barrier) handleNoStateHeartbeat(dispatcherID common.DispatcherID, check
149149
}}
150150
event.writerDispatcher = dispatcherID
151151
event.selected = true
152-
return nil, distacherStatus, nil
152+
return nil, dispatcherStatus, nil
153153
}
154154
}
155155

maintainer/maintainer.go

+8-19
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ package maintainer
1515

1616
import (
1717
"encoding/json"
18-
"fmt"
1918
"math"
2019
"sync"
2120
"time"
@@ -100,6 +99,7 @@ type Maintainer struct {
10099
scheduledTaskGauge prometheus.Gauge
101100
runningTaskGauge prometheus.Gauge
102101
tableCountGauge prometheus.Gauge
102+
handleEventDuration prometheus.Observer
103103
}
104104

105105
// NewMaintainer create the maintainer for the changefeed
@@ -143,6 +143,7 @@ func NewMaintainer(cfID model.ChangeFeedID,
143143
scheduledTaskGauge: metrics.ScheduleTaskGuage.WithLabelValues(cfID.Namespace, cfID.ID),
144144
runningTaskGauge: metrics.RunningScheduleTaskGauge.WithLabelValues(cfID.Namespace, cfID.ID),
145145
tableCountGauge: metrics.TableGauge.WithLabelValues(cfID.Namespace, cfID.ID),
146+
handleEventDuration: metrics.MaintainerHandleEventDuration.WithLabelValues(cfID.Namespace, cfID.ID),
146147
}
147148
m.bootstrapper = NewBootstrapper(m.id.ID, m.getNewBootstrapFn())
148149
m.barrier = NewBarrier(m.scheduler)
@@ -159,12 +160,13 @@ func (m *Maintainer) HandleEvent(event *Event) bool {
159160
start := time.Now()
160161
defer func() {
161162
duration := time.Since(start)
162-
if duration > time.Millisecond*500 {
163+
if duration > time.Second {
163164
log.Info("maintainer is too slow",
164165
zap.String("id", m.id.String()),
165166
zap.Int("type", event.eventType),
166167
zap.Duration("duration", duration))
167168
}
169+
m.handleEventDuration.Observe(duration.Seconds())
168170
}()
169171
if m.state == heartbeatpb.ComponentState_Stopped {
170172
log.Warn("maintainer is not stopped, ignore",
@@ -274,6 +276,7 @@ func (m *Maintainer) cleanupMetrics() {
274276
metrics.ScheduleTaskGuage.DeleteLabelValues(m.id.Namespace, m.id.ID)
275277
metrics.RunningScheduleTaskGauge.DeleteLabelValues(m.id.Namespace, m.id.ID)
276278
metrics.TableGauge.DeleteLabelValues(m.id.Namespace, m.id.ID)
279+
metrics.MaintainerHandleEventDuration.DeleteLabelValues(m.id.Namespace, m.id.ID)
277280
}
278281

279282
func (m *Maintainer) onInit() bool {
@@ -407,7 +410,7 @@ func (m *Maintainer) updateMetrics() {
407410
m.changefeedStatusGauge.Set(float64(m.state))
408411
}
409412

410-
// send message to remote, todo: use a io thread pool
413+
// send message to remote
411414
func (m *Maintainer) sendMessages(msgs []*messaging.TargetMessage) {
412415
for _, msg := range msgs {
413416
err := m.mc.SendCommand(msg)
@@ -628,15 +631,15 @@ func (m *Maintainer) onPeriodTask() {
628631
}
629632
// send scheduling messages
630633
m.handleResendMessage()
631-
m.printStatus()
634+
m.collectMetrics()
632635
m.calCheckpointTs()
633636
SubmitScheduledEvent(m.taskScheduler, m.stream, &Event{
634637
changefeedID: m.id.ID,
635638
eventType: EventPeriod,
636639
}, time.Now().Add(time.Millisecond*500))
637640
}
638641

639-
func (m *Maintainer) printStatus() {
642+
func (m *Maintainer) collectMetrics() {
640643
if time.Since(m.lastPrintStatusTime) > time.Second*20 {
641644
tableStates := make(map[scheduler.SchedulerStatus]int)
642645
total := m.scheduler.TaskSize()
@@ -650,20 +653,6 @@ func (m *Maintainer) printStatus() {
650653
for state, count := range tableStates {
651654
metrics.TableStateGauge.WithLabelValues(m.id.Namespace, m.id.ID, state.String()).Set(float64(count))
652655
}
653-
654-
var taskDistribution string
655-
for nodeID, _ := range m.bootstrapper.GetAllNodes() {
656-
taskDistribution = fmt.Sprintf("%s, %s=%d",
657-
taskDistribution, nodeID, m.scheduler.GetTaskSizeByNodeID(nodeID))
658-
}
659-
log.Info("table span status",
660-
zap.String("distribution", taskDistribution),
661-
zap.String("changefeed", m.id.ID),
662-
zap.Int("total", total),
663-
zap.Int("absent", tableStates[scheduler.SchedulerStatusAbsent]),
664-
zap.Int("commiting", tableStates[scheduler.SchedulerStatusCommiting]),
665-
zap.Int("working", tableStates[scheduler.SchedulerStatusWorking]),
666-
zap.Int("removing", tableStates[scheduler.SchedulerStatusRemoving]))
667656
m.lastPrintStatusTime = time.Now()
668657
}
669658
}

pkg/metrics/init.go

+1
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,5 @@ func InitMetrics(registry *prometheus.Registry) {
2828
InitEventStoreMetrics(registry)
2929
InitSchemaStoreMetrics(registry)
3030
InitEventServiceMetrics(registry)
31+
InitMaintainerMetrics(registry)
3132
}

pkg/metrics/maintainer.go

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Copyright 2024 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package metrics
15+
16+
import "github.com/prometheus/client_golang/prometheus"
17+
18+
var (
19+
MaintainerHandleEventDuration = prometheus.NewHistogramVec(
20+
prometheus.HistogramOpts{
21+
Namespace: "ticdc",
22+
Subsystem: "maintainer",
23+
Name: "handle_event_duration",
24+
Help: "Bucketed histogram of maintainer handle event time (s).",
25+
Buckets: prometheus.ExponentialBuckets(0.01 /* 10 ms */, 2, 18),
26+
}, []string{"namespace", "changefeed"})
27+
)
28+
29+
func InitMaintainerMetrics(registry *prometheus.Registry) {
30+
registry.MustRegister(MaintainerHandleEventDuration)
31+
}

0 commit comments

Comments
 (0)