Skip to content

Commit

Permalink
scheduler, operator: support operatorLimitCounter for region-schedule…
Browse files Browse the repository at this point in the history
…-limit (#3351)

* add counter

Signed-off-by: Song Gao <disxiaofei@163.com>

* add metrics

Signed-off-by: Song Gao <disxiaofei@163.com>

* fix lint

Signed-off-by: Song Gao <disxiaofei@163.com>

* fix lint

Signed-off-by: Song Gao <disxiaofei@163.com>

* address the comment

Signed-off-by: Song Gao <disxiaofei@163.com>

* add grafana pannel

Signed-off-by: Song Gao <disxiaofei@163.com>

* add grafana pannel

Signed-off-by: Song Gao <disxiaofei@163.com>
  • Loading branch information
Yisaer authored Jan 14, 2021
1 parent fa552dc commit 8eabcca
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 7 deletions.
90 changes: 90 additions & 0 deletions metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -2177,6 +2177,96 @@
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "The number of different operators that are failed to be created due to limit configuration",
"fill": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 43
},
"id": 1426,
"legend": {
"alignAsTable": true,
"avg": true,
"current": true,
"hideEmpty": true,
"hideZero": true,
"max": true,
"min": false,
"rightSide": true,
"show": true,
"total": false,
"values": true
},
"lines": true,
"linewidth": 2,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "sum(delta(pd_schedule_operator_limit{instance=\"$instance\"}[1m])) by (type,name)",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "{{type}}-{{name}}",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Operator limit",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "opm",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
}
],
"repeat": null,
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ func (c *coordinator) collectSchedulerMetrics() {
var allowScheduler float64
// If the scheduler is not allowed to schedule, it will disappear in Grafana panel.
// See issue #1341.
if s.AllowSchedule() {
if !s.IsPaused() {
allowScheduler = 1
}
schedulerStatusGauge.WithLabelValues(s.GetName(), "allow").Set(allowScheduler)
Expand Down
10 changes: 10 additions & 0 deletions server/schedule/operator/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,18 @@ var (
Help: "Bucketed histogram of processing time (s) of finished operator step.",
Buckets: prometheus.ExponentialBuckets(0.01, 2, 16),
}, []string{"type"})

// OperatorLimitCounter exposes the counter when meeting limit.
OperatorLimitCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "schedule",
Name: "operator_limit",
Help: "Counter of operator meeting limit",
}, []string{"type", "name"})
)

func init() {
prometheus.MustRegister(operatorStepDuration)
prometheus.MustRegister(OperatorLimitCounter)
}
6 changes: 5 additions & 1 deletion server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,11 @@ func (s *balanceRegionScheduler) EncodeConfig() ([]byte, error) {
}

func (s *balanceRegionScheduler) IsScheduleAllowed(cluster opt.Cluster) bool {
return s.opController.OperatorCount(operator.OpRegion)-s.opController.OperatorCount(operator.OpMerge) < cluster.GetOpts().GetRegionScheduleLimit()
allowed := s.opController.OperatorCount(operator.OpRegion)-s.opController.OperatorCount(operator.OpMerge) < cluster.GetOpts().GetRegionScheduleLimit()
if !allowed {
operator.OperatorLimitCounter.WithLabelValues(s.GetType(), operator.OpRegion.String()).Inc()
}
return allowed
}

func (s *balanceRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Operator {
Expand Down
6 changes: 5 additions & 1 deletion server/schedulers/scatter_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,11 @@ func (l *scatterRangeScheduler) EncodeConfig() ([]byte, error) {
}

func (l *scatterRangeScheduler) IsScheduleAllowed(cluster opt.Cluster) bool {
return l.OpController.OperatorCount(operator.OpRange) < cluster.GetOpts().GetRegionScheduleLimit()
allowed := l.OpController.OperatorCount(operator.OpRange) < cluster.GetOpts().GetRegionScheduleLimit()
if !allowed {
operator.OperatorLimitCounter.WithLabelValues(l.GetType(), operator.OpRegion.String()).Inc()
}
return allowed
}

func (l *scatterRangeScheduler) Schedule(cluster opt.Cluster) []*operator.Operator {
Expand Down
16 changes: 13 additions & 3 deletions server/schedulers/shuffle_hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,19 @@ func (s *shuffleHotRegionScheduler) EncodeConfig() ([]byte, error) {
}

func (s *shuffleHotRegionScheduler) IsScheduleAllowed(cluster opt.Cluster) bool {
return s.OpController.OperatorCount(operator.OpHotRegion) < s.conf.Limit &&
s.OpController.OperatorCount(operator.OpRegion) < cluster.GetOpts().GetRegionScheduleLimit() &&
s.OpController.OperatorCount(operator.OpLeader) < cluster.GetOpts().GetLeaderScheduleLimit()
hotRegionAllowed := s.OpController.OperatorCount(operator.OpHotRegion) < s.conf.Limit
regionAllowed := s.OpController.OperatorCount(operator.OpRegion) < cluster.GetOpts().GetRegionScheduleLimit()
leaderAllowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetOpts().GetLeaderScheduleLimit()
// TODO: Increment OperatorLimitCounter for OpHotRegion
//if !hotRegionAllowed {
//}
if !regionAllowed {
operator.OperatorLimitCounter.WithLabelValues(s.GetType(), operator.OpRegion.String()).Inc()
}
// TODO: Increment OperatorLimitCounter for OpLeader
//if !leaderAllowed {
//}
return hotRegionAllowed && regionAllowed && leaderAllowed
}

func (s *shuffleHotRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Operator {
Expand Down
6 changes: 5 additions & 1 deletion server/schedulers/shuffle_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ func (s *shuffleRegionScheduler) EncodeConfig() ([]byte, error) {
}

func (s *shuffleRegionScheduler) IsScheduleAllowed(cluster opt.Cluster) bool {
return s.OpController.OperatorCount(operator.OpRegion) < cluster.GetOpts().GetRegionScheduleLimit()
allowed := s.OpController.OperatorCount(operator.OpRegion) < cluster.GetOpts().GetRegionScheduleLimit()
if !allowed {
operator.OperatorLimitCounter.WithLabelValues(s.GetType(), operator.OpRegion.String()).Inc()
}
return allowed
}

func (s *shuffleRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Operator {
Expand Down

0 comments on commit 8eabcca

Please sign in to comment.