Skip to content

Commit

Permalink
filter: add label for filterCounter (#3320)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yisaer authored Jan 14, 2021
1 parent 8eabcca commit 5f447ae
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 12 deletions.
24 changes: 24 additions & 0 deletions metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -6323,6 +6323,30 @@
"metric": "pd_scheduler_event_count",
"refId": "A",
"step": 4
},
{
"expr": "sum(delta(pd_schedule_filter{action=\"filter-target\",type=\"distinct-filter\"}[1m])) by (source, target, type, scope)",
"format": "time_series",
"hide": true,
"intervalFactor": 2,
"legendFormat": "{{scope}}-{{type}}-{{source}}-{{target}}",
"refId": "B"
},
{
"expr": "sum(delta(pd_schedule_filter{action=\"filter-target\",type=\"rule-fit-filter\"}[1m])) by (source, target, type, scope)",
"format": "time_series",
"hide": true,
"intervalFactor": 2,
"legendFormat": "{{scope}}-{{type}}-{{source}}-{{target}}",
"refId": "C"
},
{
"expr": "sum(delta(pd_schedule_filter{action=\"filter-target\",type=\"rule-fit-leader-filter\"}[1m])) by (source, target, type, scope)",
"format": "time_series",
"hide": true,
"intervalFactor": 2,
"legendFormat": "{{scope}}-{{type}}-{{source}}-{{target}}",
"refId": "D"
}
],
"thresholds": [],
Expand Down
66 changes: 55 additions & 11 deletions server/schedule/filter/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ func SelectSourceStores(stores []*core.StoreInfo, filters []Filter, opt *config.
return filterStoresBy(stores, func(s *core.StoreInfo) bool {
return slice.AllOf(filters, func(i int) bool {
if !filters[i].Source(opt, s) {
filterCounter.WithLabelValues("filter-source", s.GetAddress(), fmt.Sprintf("%d", s.GetID()), filters[i].Scope(), filters[i].Type()).Inc()
sourceID := fmt.Sprintf("%d", s.GetID())
targetID := ""
filterCounter.WithLabelValues("filter-source", s.GetAddress(),
sourceID, filters[i].Scope(), filters[i].Type(), sourceID, targetID).Inc()
return false
}
return true
Expand All @@ -47,8 +50,16 @@ func SelectSourceStores(stores []*core.StoreInfo, filters []Filter, opt *config.
func SelectTargetStores(stores []*core.StoreInfo, filters []Filter, opt *config.PersistOptions) []*core.StoreInfo {
return filterStoresBy(stores, func(s *core.StoreInfo) bool {
return slice.AllOf(filters, func(i int) bool {
if !filters[i].Target(opt, s) {
filterCounter.WithLabelValues("filter-target", s.GetAddress(), fmt.Sprintf("%d", s.GetID()), filters[i].Scope(), filters[i].Type()).Inc()
filter := filters[i]
if !filter.Target(opt, s) {
cfilter, ok := filter.(comparingFilter)
targetID := fmt.Sprintf("%d", s.GetID())
sourceID := ""
if ok {
sourceID = fmt.Sprintf("%d", cfilter.GetSourceStoreID())
}
filterCounter.WithLabelValues("filter-target", s.GetAddress(),
targetID, filters[i].Scope(), filters[i].Type(), sourceID, targetID).Inc()
return false
}
return true
Expand Down Expand Up @@ -76,13 +87,23 @@ type Filter interface {
Target(opt *config.PersistOptions, store *core.StoreInfo) bool
}

// comparingFilter is an interface to filter target store by comparing source and target stores
type comparingFilter interface {
Filter
// GetSourceStoreID returns the source store when comparing.
GetSourceStoreID() uint64
}

// Source checks if store can pass all Filters as source store.
func Source(opt *config.PersistOptions, store *core.StoreInfo, filters []Filter) bool {
storeAddress := store.GetAddress()
storeID := fmt.Sprintf("%d", store.GetID())
for _, filter := range filters {
if !filter.Source(opt, store) {
filterCounter.WithLabelValues("filter-source", storeAddress, storeID, filter.Scope(), filter.Type()).Inc()
sourceID := storeID
targetID := ""
filterCounter.WithLabelValues("filter-source", storeAddress,
sourceID, filter.Scope(), filter.Type(), sourceID, targetID).Inc()
return false
}
}
Expand All @@ -95,7 +116,14 @@ func Target(opt *config.PersistOptions, store *core.StoreInfo, filters []Filter)
storeID := fmt.Sprintf("%d", store.GetID())
for _, filter := range filters {
if !filter.Target(opt, store) {
filterCounter.WithLabelValues("filter-target", storeAddress, storeID, filter.Scope(), filter.Type()).Inc()
cfilter, ok := filter.(comparingFilter)
targetID := storeID
sourceID := ""
if ok {
sourceID = fmt.Sprintf("%d", cfilter.GetSourceStoreID())
}
filterCounter.WithLabelValues("filter-target", storeAddress,
targetID, filter.Scope(), filter.Type(), sourceID, targetID).Inc()
return false
}
}
Expand Down Expand Up @@ -166,6 +194,7 @@ type distinctScoreFilter struct {
stores []*core.StoreInfo
policy string
safeScore float64
srcStore uint64
}

const (
Expand Down Expand Up @@ -203,6 +232,7 @@ func newDistinctScoreFilter(scope string, labels []string, stores []*core.StoreI
stores: newStores,
safeScore: core.DistinctScore(labels, newStores, source),
policy: policy,
srcStore: source.GetID(),
}
}

Expand Down Expand Up @@ -230,6 +260,11 @@ func (f *distinctScoreFilter) Target(opt *config.PersistOptions, store *core.Sto
}
}

// GetSourceStoreID implements the ComparingFilter
func (f *distinctScoreFilter) GetSourceStoreID() uint64 {
return f.srcStore
}

// StoreStateFilter is used to determine whether a store can be selected as the
// source or target of the schedule based on the store's state.
type StoreStateFilter struct {
Expand Down Expand Up @@ -424,7 +459,7 @@ type ruleFitFilter struct {
fitter RegionFitter
region *core.RegionInfo
oldFit *placement.RegionFit
oldStore uint64
srcStore uint64
}

// newRuleFitFilter creates a filter that ensures after replace a peer with new
Expand All @@ -436,7 +471,7 @@ func newRuleFitFilter(scope string, fitter RegionFitter, region *core.RegionInfo
fitter: fitter,
region: region,
oldFit: fitter.FitRegion(region),
oldStore: oldStoreID,
srcStore: oldStoreID,
}
}

Expand All @@ -455,28 +490,33 @@ func (f *ruleFitFilter) Source(opt *config.PersistOptions, store *core.StoreInfo
func (f *ruleFitFilter) Target(opt *config.PersistOptions, store *core.StoreInfo) bool {
region := createRegionForRuleFit(f.region.GetStartKey(), f.region.GetEndKey(),
f.region.GetPeers(), f.region.GetLeader(),
core.WithReplacePeerStore(f.oldStore, store.GetID()))
core.WithReplacePeerStore(f.srcStore, store.GetID()))
newFit := f.fitter.FitRegion(region)
return placement.CompareRegionFit(f.oldFit, newFit) <= 0
}

// GetSourceStoreID implements the ComparingFilter
func (f *ruleFitFilter) GetSourceStoreID() uint64 {
return f.srcStore
}

type ruleLeaderFitFilter struct {
scope string
fitter RegionFitter
region *core.RegionInfo
oldFit *placement.RegionFit
oldLeaderStoreID uint64
srcLeaderStoreID uint64
}

// newRuleLeaderFitFilter creates a filter that ensures after transfer leader with new store,
// the isolation level will not decrease.
func newRuleLeaderFitFilter(scope string, fitter RegionFitter, region *core.RegionInfo, oldLeaderStoreID uint64) Filter {
func newRuleLeaderFitFilter(scope string, fitter RegionFitter, region *core.RegionInfo, srcLeaderStoreID uint64) Filter {
return &ruleLeaderFitFilter{
scope: scope,
fitter: fitter,
region: region,
oldFit: fitter.FitRegion(region),
oldLeaderStoreID: oldLeaderStoreID,
srcLeaderStoreID: srcLeaderStoreID,
}
}

Expand Down Expand Up @@ -505,6 +545,10 @@ func (f *ruleLeaderFitFilter) Target(opt *config.PersistOptions, store *core.Sto
return placement.CompareRegionFit(f.oldFit, newFit) <= 0
}

func (f *ruleLeaderFitFilter) GetSourceStoreID() uint64 {
return f.srcLeaderStoreID
}

// NewPlacementSafeguard creates a filter that ensures after replace a peer with new
// peer, the placement restriction will not become worse.
func NewPlacementSafeguard(scope string, cluster opt.Cluster, region *core.RegionInfo, sourceStore *core.StoreInfo) Filter {
Expand Down
2 changes: 1 addition & 1 deletion server/schedule/filter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var (
Subsystem: "schedule",
Name: "filter",
Help: "Counter of the filter",
}, []string{"action", "address", "store", "scope", "type"})
}, []string{"action", "address", "store", "scope", "type", "source", "target"})
)

func init() {
Expand Down

0 comments on commit 5f447ae

Please sign in to comment.