Skip to content

Commit

Permalink
gc_worker: Skip TiFlash nodes when doing UnsafeDestroyRange and Green…
Browse files Browse the repository at this point in the history
… GC (pingcap#15505)
  • Loading branch information
MyonKeminta committed Mar 26, 2020
1 parent 046d35d commit cd19cfa
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 4 deletions.
55 changes: 51 additions & 4 deletions store/tikv/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func (w *GCWorker) getGCConcurrency(ctx context.Context) (int, error) {
return w.loadGCConcurrencyWithDefault()
}

stores, err := w.getUpStores(ctx)
stores, err := w.getUpStoresForGC(ctx)
concurrency := len(stores)
if err != nil {
logutil.Logger(ctx).Error("[gc worker] failed to get up stores to calculate concurrency. use config.",
Expand Down Expand Up @@ -648,7 +648,7 @@ func (w *GCWorker) redoDeleteRanges(ctx context.Context, safePoint uint64, concu

func (w *GCWorker) doUnsafeDestroyRangeRequest(ctx context.Context, startKey []byte, endKey []byte, concurrency int) error {
// Get all stores every time deleting a region. So the store list is less probably to be stale.
stores, err := w.getUpStores(ctx)
stores, err := w.getUpStoresForGC(ctx)
if err != nil {
logutil.Logger(ctx).Error("[gc worker] delete ranges: got an error while trying to get store list from PD",
zap.String("uuid", w.uuid),
Expand Down Expand Up @@ -699,15 +699,62 @@ func (w *GCWorker) doUnsafeDestroyRangeRequest(ctx context.Context, startKey []b
return errors.Trace(err)
}

func (w *GCWorker) getUpStores(ctx context.Context) ([]*metapb.Store, error) {
const (
engineLabelKey = "engine"
engineLabelTiFlash = "tiflash"
engineLabelTiKV = "tikv"
)

// needsGCOperationForStore checks if the store-level requests related to GC needs to be sent to the store. The store-level
// requests includes UnsafeDestroyRange, PhysicalScanLock, etc.
func needsGCOperationForStore(store *metapb.Store) (bool, error) {
engineLabel := ""

for _, label := range store.GetLabels() {
if label.GetKey() == engineLabelKey {
engineLabel = label.GetValue()
break
}
}

switch engineLabel {
case engineLabelTiFlash:
// For a TiFlash node, it uses other approach to delete dropped tables, so it's safe to skip sending
// UnsafeDestroyRange requests; it has only learner peers and their data must exist in TiKV, so it's safe to
// skip physical resolve locks for it.
return false, nil

case "":
// If no engine label is set, it should be a TiKV node.
fallthrough
case engineLabelTiKV:
return true, nil

default:
return true, errors.Errorf("unsupported store engine \"%v\" with storeID %v, addr %v",
engineLabel,
store.GetId(),
store.GetAddress())
}
}

// getUpStoresForGC gets the list of stores that needs to be processed during GC.
func (w *GCWorker) getUpStoresForGC(ctx context.Context) ([]*metapb.Store, error) {
stores, err := w.pdClient.GetAllStores(ctx)
if err != nil {
return nil, errors.Trace(err)
}

upStores := make([]*metapb.Store, 0, len(stores))
for _, store := range stores {
if store.State == metapb.StoreState_Up {
if store.State != metapb.StoreState_Up {
continue
}
needsGCOp, err := needsGCOperationForStore(store)
if err != nil {
return nil, errors.Trace(err)
}
if needsGCOp {
upStores = append(upStores, store)
}
}
Expand Down
30 changes: 30 additions & 0 deletions store/tikv/gcworker/gc_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,36 @@ func (s *testGCWorkerSuite) TestCheckGCMode(c *C) {
c.Assert(useDistributedGC, Equals, true)
}

func (s *testGCWorkerSuite) TestNeedsGCOperationForStore(c *C) {
newStore := func(hasEngineLabel bool, engineLabel string) *metapb.Store {
store := &metapb.Store{}
if hasEngineLabel {
store.Labels = []*metapb.StoreLabel{{Key: engineLabelKey, Value: engineLabel}}
}
return store
}

// TiKV needs to do the store-level GC operations.
res, err := needsGCOperationForStore(newStore(false, ""))
c.Assert(err, IsNil)
c.Assert(res, IsTrue)
res, err = needsGCOperationForStore(newStore(true, ""))
c.Assert(err, IsNil)
c.Assert(res, IsTrue)
res, err = needsGCOperationForStore(newStore(true, engineLabelTiKV))
c.Assert(err, IsNil)
c.Assert(res, IsTrue)

// TiFlash does not need these operations.
res, err = needsGCOperationForStore(newStore(true, engineLabelTiFlash))
c.Assert(err, IsNil)
c.Assert(res, IsFalse)

// Throw an error for unknown store types.
_, err = needsGCOperationForStore(newStore(true, "invalid"))
c.Assert(err, NotNil)
}

func (s *testGCWorkerSuite) TestResolveLockRangeInfine(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/invalidCacheAndRetry", "return(true)"), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/gcworker/setGcResolveMaxBackoff", "return(1)"), IsNil)
Expand Down

0 comments on commit cd19cfa

Please sign in to comment.