Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move the guts of Store.RemoveReplica to the processRaft thread. #2349

Merged
merged 1 commit into from
Sep 3, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,3 +924,23 @@ func TestRaftAfterRemoveRange(t *testing.T) {
// Execute another replica change to ensure that MultiRaft has processed the heartbeat just sent.
mtc.replicateRange(proto.RangeID(1), 0, 1)
}

// TestRaftRemoveRace adds and removes a replica repeatedly in an
// attempt to reproduce a race
// (https://github.com/cockroachdb/cockroach/issues/1911). Note that
// 10 repetitions is not enough to reliably reproduce the problem, but
// it's better than any other tests we have for this (increasing the
// number of repetitions adds an unacceptable amount of test runtime).
func TestRaftRemoveRace(t *testing.T) {
defer leaktest.AfterTest(t)
mtc := startMultiTestContext(t, 3)
defer mtc.Stop()

rangeID := proto.RangeID(1)
mtc.replicateRange(rangeID, 0, 1, 2)

for i := 0; i < 10; i++ {
mtc.unreplicateRange(rangeID, 0, 2)
mtc.replicateRange(rangeID, 0, 2)
}
}
94 changes: 58 additions & 36 deletions storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,25 +247,26 @@ func (rs *storeRangeSet) EstimatedCount() int {
// A Store maintains a map of ranges by start key. A Store corresponds
// to one physical device.
type Store struct {
Ident proto.StoreIdent
ctx StoreContext
db *client.DB
engine engine.Engine // The underlying key-value store
_allocator allocator // Makes allocation decisions
rangeIDAlloc *idAllocator // Range ID allocator
gcQueue *gcQueue // Garbage collection queue
_splitQueue *splitQueue // Range splitting queue
verifyQueue *verifyQueue // Checksum verification queue
replicateQueue replicateQueue // Replication queue
_rangeGCQueue *rangeGCQueue // Range GC queue
scanner *replicaScanner // Range scanner
feed StoreEventFeed // Event Feed
multiraft *multiraft.MultiRaft
started int32
stopper *stop.Stopper
startedAt int64
nodeDesc *proto.NodeDescriptor
initComplete sync.WaitGroup // Signaled by async init tasks
Ident proto.StoreIdent
ctx StoreContext
db *client.DB
engine engine.Engine // The underlying key-value store
_allocator allocator // Makes allocation decisions
rangeIDAlloc *idAllocator // Range ID allocator
gcQueue *gcQueue // Garbage collection queue
_splitQueue *splitQueue // Range splitting queue
verifyQueue *verifyQueue // Checksum verification queue
replicateQueue replicateQueue // Replication queue
_rangeGCQueue *rangeGCQueue // Range GC queue
scanner *replicaScanner // Range scanner
feed StoreEventFeed // Event Feed
removeReplicaChan chan removeReplicaOp
multiraft *multiraft.MultiRaft
started int32
stopper *stop.Stopper
startedAt int64
nodeDesc *proto.NodeDescriptor
initComplete sync.WaitGroup // Signaled by async init tasks

mu sync.RWMutex // Protects variables below...
replicas map[proto.RangeID]*Replica // Map of replicas by Range ID
Expand Down Expand Up @@ -358,14 +359,15 @@ func NewStore(ctx StoreContext, eng engine.Engine, nodeDesc *proto.NodeDescripto
}

s := &Store{
ctx: ctx,
db: ctx.DB, // TODO(tschottdorf) remove redundancy.
engine: eng,
_allocator: makeAllocator(ctx.StorePool),
replicas: map[proto.RangeID]*Replica{},
replicasByKey: btree.New(64 /* degree */),
uninitReplicas: map[proto.RangeID]*Replica{},
nodeDesc: nodeDesc,
ctx: ctx,
db: ctx.DB, // TODO(tschottdorf) remove redundancy.
engine: eng,
_allocator: makeAllocator(ctx.StorePool),
replicas: map[proto.RangeID]*Replica{},
replicasByKey: btree.New(64 /* degree */),
uninitReplicas: map[proto.RangeID]*Replica{},
nodeDesc: nodeDesc,
removeReplicaChan: make(chan removeReplicaOp),
}

// Add range scanner and configure with queues.
Expand Down Expand Up @@ -1060,7 +1062,7 @@ func (s *Store) SplitRange(origRng, newRng *Replica) error {

// MergeRange expands the subsuming range to absorb the subsumed range.
// This merge operation will fail if the two ranges are not collocated
// on the same store.
// on the same store. Must be called from the processRaft goroutine.
func (s *Store) MergeRange(subsumingRng *Replica, updatedEndKey proto.Key, subsumedRangeID proto.RangeID) error {
subsumingDesc := subsumingRng.Desc()

Expand All @@ -1080,8 +1082,9 @@ func (s *Store) MergeRange(subsumingRng *Replica, updatedEndKey proto.Key, subsu
subsumedDesc.Replicas, subsumingDesc.Replicas)
}

// Remove and destroy the subsumed range.
if err = s.RemoveReplica(subsumedRng); err != nil {
// Remove and destroy the subsumed range. Note that we are on the
// processRaft goroutine so we can call removeReplicaImpl directly.
if err = s.removeReplicaImpl(subsumedRng); err != nil {
return util.Errorf("cannot remove range %s", err)
}

Expand Down Expand Up @@ -1145,24 +1148,40 @@ func (s *Store) addReplicaToRangeMap(rng *Replica) error {
return nil
}

type removeReplicaOp struct {
rep *Replica
ch chan<- error
}

// RemoveReplica removes the replica from the store's replica map and from
// the sorted replicasByKey btree.
func (s *Store) RemoveReplica(rng *Replica) error {
rangeID := rng.Desc().RangeID
func (s *Store) RemoveReplica(rep *Replica) error {

ch := make(chan error)
s.removeReplicaChan <- removeReplicaOp{rep, ch}
return <-ch
}

// removeReplicaImpl runs on the processRaft goroutine.
func (s *Store) removeReplicaImpl(rep *Replica) error {
rangeID := rep.Desc().RangeID

// RemoveGroup needs to access the storage, which in turn needs the
// lock. Some care is needed to avoid deadlocks.
// lock. Some care is needed to avoid deadlocks. We remove the group
// from multiraft outside the scope of s.mu; this is effectively
// synchronized by the fact that this method runs on the processRaft
// goroutine.
if err := s.multiraft.RemoveGroup(rangeID); err != nil {
return err
}
s.mu.Lock()
defer s.mu.Unlock()

delete(s.replicas, rangeID)
if s.replicasByKey.Delete(rng) == nil {
return util.Errorf("couldn't find range in rangesByKey btree")
if s.replicasByKey.Delete(rep) == nil {
return util.Errorf("couldn't find range in replicasByKey btree")
}
s.scanner.RemoveReplica(rng)
s.scanner.RemoveReplica(rep)
return nil
}

Expand Down Expand Up @@ -1567,6 +1586,9 @@ func (s *Store) processRaft() {
callback(err)
}

case op := <-s.removeReplicaChan:
op.ch <- s.removeReplicaImpl(op.rep)

case <-s.stopper.ShouldStop():
return
}
Expand Down