Skip to content

Commit

Permalink
Merge pull request #5251 from tschottdorf/queue_load
Browse files Browse the repository at this point in the history
Reduce scanner/queue load
  • Loading branch information
tbg committed Mar 15, 2016
2 parents bd46945 + 8af40bd commit 3286eee
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 10 deletions.
8 changes: 6 additions & 2 deletions storage/gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ const (
// TODO(tschottdorf): need to enforce at all times that this is much
// larger than the heartbeat interval used by the coordinator.
txnCleanupThreshold = time.Hour

// considerThreshold is used in shouldQueue. Only an a normalized GC bytes
// or intent byte age larger than the threshold queues the replica for GC.
considerThreshold = 10
)

// gcQueue manages a queue of replicas slated to be scanned in their
Expand Down Expand Up @@ -111,10 +115,10 @@ func (*gcQueue) shouldQueue(now roachpb.Timestamp, repl *Replica,
intentScore := repl.stats.GetAvgIntentAge(now.WallTime) / float64(intentAgeNormalization.Nanoseconds()/1E9)

// Compute priority.
if gcScore >= 1 {
if gcScore >= considerThreshold {
priority += gcScore
}
if intentScore >= 1 {
if intentScore >= considerThreshold {
priority += intentScore
}
shouldQ = priority > 0
Expand Down
14 changes: 7 additions & 7 deletions storage/gc_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestGCQueueShouldQueue(t *testing.T) {
bc := int64(gcByteCountNormalization)
ttl := int64(policy.TTLSeconds)

now := makeTS(iaN, 0) // at time of stats object
now := makeTS(considerThreshold*iaN, 0) // at time of stats object

testCases := []struct {
gcBytes int64
Expand Down Expand Up @@ -106,13 +106,13 @@ func TestGCQueueShouldQueue(t *testing.T) {
// Queues solely because of gc'able bytes.
{bc, 5 * bc * ttl, 10 * ia, 0, now, true, 5},
// A contribution of 1 from gc, 10/5 from intents.
{bc, bc * ttl, 5, 10 * ia, now, true, 1 + 2},
{bc, bc * ttl, 5, 10 * ia, now, true, (1 + 2)},

// Some tests where the ages increase since we call shouldNow with
// a later timestamp.

// One normalized unit of unaged gc'able bytes at time zero.
{ttl * bc, 0, 0, 0, roachpb.ZeroTimestamp, true, float64(now.WallTime) / 1E9},
{ttl * bc, 0, 0, 0, roachpb.ZeroTimestamp, true, float64(now.WallTime) / (1E9 * considerThreshold)},

// 2 intents aging from zero to now (which is exactly the intent age
// normalization).
Expand All @@ -129,8 +129,8 @@ func TestGCQueueShouldQueue(t *testing.T) {
stats := engine.MVCCStats{
KeyBytes: test.gcBytes,
IntentCount: test.intentCount,
IntentAge: test.intentAge,
GCBytesAge: test.gcBytesAge,
IntentAge: test.intentAge * considerThreshold,
GCBytesAge: test.gcBytesAge * considerThreshold,
LastUpdateNanos: test.now.WallTime,
}
if err := tc.rng.stats.SetMVCCStats(tc.rng.store.Engine(), stats); err != nil {
Expand All @@ -140,8 +140,8 @@ func TestGCQueueShouldQueue(t *testing.T) {
if shouldQ != test.shouldQ {
t.Errorf("%d: should queue expected %t; got %t", i, test.shouldQ, shouldQ)
}
if math.Abs(priority-test.priority) > 0.00001 {
t.Errorf("%d: priority expected %f; got %f", i, test.priority, priority)
if scaledExpPri := test.priority * considerThreshold; math.Abs(priority-scaledExpPri) > 0.00001 {
t.Errorf("%d: priority expected %f; got %f", i, scaledExpPri, priority)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ func NewStore(ctx StoreContext, eng engine.Engine, nodeDesc *roachpb.NodeDescrip
s.scanner.AddQueues(s.gcQueue, s.splitQueue, s.verifyQueue, s.replicateQueue, s.replicaGCQueue, s.raftLogQueue)

// Add consistency check scanner.
s.consistencyScanner = newReplicaScanner(ctx.ConsistencyCheckInterval, ctx.ScanMaxIdleTime, newStoreRangeSet(s))
s.consistencyScanner = newReplicaScanner(ctx.ConsistencyCheckInterval, 0, newStoreRangeSet(s))
s.replicaConsistencyQueue = newReplicaConsistencyQueue(s.ctx.Gossip)
s.consistencyScanner.AddQueues(s.replicaConsistencyQueue)

Expand Down

0 comments on commit 3286eee

Please sign in to comment.