Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rangedesciter: support scoped iteration #91330

Merged
merged 1 commit into from
Nov 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func TestTruncate(t *testing.T) {
desc.EndKey = roachpb.RKey(test.to)
}
rs := roachpb.RSpan{Key: roachpb.RKey(test.from), EndKey: roachpb.RKey(test.to)}
rs, err := rs.Intersect(desc)
rs, err := rs.Intersect(desc.RSpan())
if err != nil {
t.Errorf("%d: intersection failure: %v", i, err)
continue
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1382,7 +1382,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
responseChs = append(responseChs, responseCh)

// Truncate the request to range descriptor.
curRangeRS, err := rs.Intersect(ri.Token().Desc())
curRangeRS, err := rs.Intersect(ri.Token().Desc().RSpan())
if err != nil {
responseCh <- response{pErr: roachpb.NewError(err)}
return
Expand Down Expand Up @@ -1632,7 +1632,7 @@ func (ds *DistSender) sendPartialBatch(
// batch, so that we know that the response to it matches the positions
// into our batch (using the full batch here would give a potentially
// larger response slice with unknown mapping to our truncated reply).
intersection, err := rs.Intersect(routingTok.Desc())
intersection, err := rs.Intersect(routingTok.Desc().RSpan())
if err != nil {
return response{pErr: roachpb.NewError(err)}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (ds *DistSender) divideAndSendRangeFeedToRanges(
ri := MakeRangeIterator(ds)
for ri.Seek(ctx, nextRS.Key, Ascending); ri.Valid(); ri.Next(ctx) {
desc := ri.Desc()
partialRS, err := nextRS.Intersect(desc)
partialRS, err := nextRS.Intersect(desc.RSpan())
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re
rs.Key = roachpb.RKeyMax
} else {
// Truncate the request span to the current range.
singleRangeSpan, err := rs.Intersect(ri.Token().Desc())
singleRangeSpan, err := rs.Intersect(ri.Token().Desc().RSpan())
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/rangecache/range_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1424,7 +1424,7 @@ func (e *CacheEntry) LeaseSpeculative() bool {
// "speculative" (sequence=0).
func (e *CacheEntry) overrides(o *CacheEntry) bool {
if util.RaceEnabled {
if _, err := e.Desc().RSpan().Intersect(o.Desc()); err != nil {
if _, err := e.Desc().RSpan().Intersect(o.Desc().RSpan()); err != nil {
panic(fmt.Sprintf("descriptors don't intersect: %s vs %s", e.Desc(), o.Desc()))
}
}
Expand Down Expand Up @@ -1464,7 +1464,7 @@ func (e *CacheEntry) overrides(o *CacheEntry) bool {
// older; this matches the semantics of b.overrides(a).
func compareEntryDescs(a, b *CacheEntry) int {
if util.RaceEnabled {
if _, err := a.Desc().RSpan().Intersect(b.Desc()); err != nil {
if _, err := a.Desc().RSpan().Intersect(b.Desc().RSpan()); err != nil {
panic(fmt.Sprintf("descriptors don't intersect: %s vs %s", a.Desc(), b.Desc()))
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/rangefeed/db_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (dbc *dbAdapter) divideAndSendScanRequests(

for ri.Seek(ctx, nextRS.Key, kvcoord.Ascending); ri.Valid(); ri.Next(ctx) {
desc := ri.Desc()
partialRS, err := nextRS.Intersect(desc)
partialRS, err := nextRS.Intersect(desc.RSpan())
if err != nil {
return err
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2395,20 +2395,20 @@ func (rs RSpan) String() string {
}

// Intersect returns the intersection of the current span and the
// descriptor's range. Returns an error if the span and the
// descriptor's range do not overlap.
func (rs RSpan) Intersect(desc *RangeDescriptor) (RSpan, error) {
if !rs.Key.Less(desc.EndKey) || !desc.StartKey.Less(rs.EndKey) {
return rs, errors.Errorf("span and descriptor's range do not overlap: %s vs %s", rs, desc)
// given range. Returns an error if the span and the range do not
// overlap.
func (rs RSpan) Intersect(rspan RSpan) (RSpan, error) {
if !rs.Key.Less(rspan.EndKey) || !rspan.Key.Less(rs.EndKey) {
return rs, errors.Errorf("spans do not overlap: %s vs %s", rs, rspan)
}

key := rs.Key
if key.Less(desc.StartKey) {
key = desc.StartKey
if key.Less(rspan.Key) {
key = rspan.Key
}
endKey := rs.EndKey
if !desc.ContainsKeyRange(desc.StartKey, endKey) {
endKey = desc.EndKey
if !rspan.ContainsKeyRange(rspan.Key, endKey) {
endKey = rspan.EndKey
}
return RSpan{key, endKey}, nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/roachpb/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1482,7 +1482,7 @@ func TestRSpanIntersect(t *testing.T) {
desc.StartKey = test.startKey
desc.EndKey = test.endKey

actual, err := rs.Intersect(&desc)
actual, err := rs.Intersect(desc.RSpan())
if err != nil {
t.Error(err)
continue
Expand All @@ -1504,7 +1504,7 @@ func TestRSpanIntersect(t *testing.T) {
desc := RangeDescriptor{}
desc.StartKey = test.startKey
desc.EndKey = test.endKey
if _, err := rs.Intersect(&desc); err == nil {
if _, err := rs.Intersect(desc.RSpan()); err == nil {
t.Errorf("%d: unexpected success", i)
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/upgrade/upgradecluster/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/upgrade/upgradecluster",
visibility = ["//visibility:public"],
deps = [
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/roachpb",
Expand Down
3 changes: 2 additions & 1 deletion pkg/upgrade/upgradecluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package upgradecluster
import (
"context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -145,5 +146,5 @@ func (c *Cluster) ForEveryNode(
func (c *Cluster) IterateRangeDescriptors(
ctx context.Context, blockSize int, init func(), fn func(...roachpb.RangeDescriptor) error,
) error {
return c.c.RangeDescIterator.Iterate(ctx, blockSize, init, fn)
return c.c.RangeDescIterator.Iterate(ctx, blockSize, init, keys.EverythingSpan, fn)
}
5 changes: 5 additions & 0 deletions pkg/util/rangedesciter/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//pkg/keys",
"//pkg/kv",
"//pkg/roachpb",
"//pkg/util/iterutil",
"@com_github_cockroachdb_errors//:errors",
],
)
Expand All @@ -21,6 +22,7 @@ go_test(
"rangedesciter_test.go",
],
args = ["-test.timeout=295s"],
data = glob(["testdata/**"]),
deps = [
":rangedesciter",
"//pkg/keys",
Expand All @@ -30,9 +32,12 @@ go_test(
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql/tests",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_stretchr_testify//require",
],
)

Expand Down
95 changes: 75 additions & 20 deletions pkg/util/rangedesciter/rangedesciter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/errors"
)

// Iterator paginates through every range descriptor in the system.
// Iterator paginates through range descriptors in the system.
type Iterator interface {
// Iterate paginates through range descriptors in the system using the given
// page size. It's important to note that the closure is being executed in
// the context of a distributed transaction that may be automatically
// retried. So something like the following is an anti-pattern:
// Iterate paginates through range descriptors in the system that overlap
// with the given span. When doing so it uses the given page size. It's
// important to note that the closure is being executed in the context of a
// distributed transaction that may be automatically retried. So something
// like the following is an anti-pattern:
//
// processed := 0
// _ = rdi.Iterate(...,
Expand All @@ -45,8 +47,15 @@ type Iterator interface {
// log.Infof(ctx, "processed %d ranges", processed)
// },
// )
//
//
// When the query span is something other than keys.EverythingSpan, the page
// size is also approximately haw many extra keys/range descriptors we may
// be reading. Callers are expected to pick a page size accordingly
// (page sizes that are much larger than expected # of descriptors would
// lead to wasted work).
Iterate(
ctx context.Context, pageSize int, init func(),
ctx context.Context, pageSize int, init func(), span roachpb.Span,
fn func(descriptors ...roachpb.RangeDescriptor) error,
) error
}
Expand Down Expand Up @@ -74,31 +83,58 @@ func (i *iteratorImpl) Iterate(
ctx context.Context,
pageSize int,
init func(),
span roachpb.Span,
fn func(descriptors ...roachpb.RangeDescriptor) error,
) error {
rspan := roachpb.RSpan{
Key: keys.MustAddr(span.Key),
EndKey: keys.MustAddr(span.EndKey),
}

return i.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Inform the caller that we're starting a fresh attempt to page in
// range descriptors.
init()

// Iterate through meta{1,2} to pull out all the range descriptors.
// Bound the start key of the meta{1,2} scan as much as possible. If the
// start key < keys.Meta1KeyMax (we're also interested in the meta1
// range descriptor), start our scan at keys.MetaMin. If not, start it
// at the relevant range meta key -- in meta1 if the start key sits
// within meta2, in meta2 if the start key is an ordinary key.
//
// So what exactly is the "relevant range meta key"? Since keys in meta
// ranges are encoded using the end keys of range descriptors, we're
// looking for the lowest existing range meta key that's strictly
// greater than RangeMetaKey(start key).
rangeMetaKeyForStart := keys.RangeMetaKey(rspan.Key)
metaScanBoundsForStart, err := keys.MetaScanBounds(rangeMetaKeyForStart)
if err != nil {
return err
}
metaScanStartKey := metaScanBoundsForStart.Key.AsRawKey()

// Iterate through meta{1,2} to pull out relevant range descriptors.
// We'll keep scanning until we've found a range descriptor outside the
// scan of interest.
var lastRangeIDInMeta1 roachpb.RangeID
return txn.Iterate(ctx, keys.MetaMin, keys.MetaMax, pageSize,
return iterutil.Map(txn.Iterate(ctx, metaScanStartKey, keys.MetaMax, pageSize,
func(rows []kv.KeyValue) error {
descriptors := make([]roachpb.RangeDescriptor, 0, len(rows))
stopMetaIteration := false

var desc roachpb.RangeDescriptor
for _, row := range rows {
err := row.ValueProto(&desc)
if err != nil {
if err := row.ValueProto(&desc); err != nil {
return errors.Wrapf(err, "unable to unmarshal range descriptor from %s", row.Key)
}

// In small enough clusters it's possible for the same range
// descriptor to be stored in both meta1 and meta2. This
// happens when some range spans both the meta and the user
// keyspace. Consider when r1 is [/Min,
// /System/NodeLiveness); we'll store the range descriptor
// in both /Meta2/<r1.EndKey> and in /Meta1/KeyMax[1].
// In small enough clusters, it's possible for the same
// range descriptor to be stored in both meta1 and meta2.
// This happens when some range spans both the meta and the
// user keyspace. Consider when r1 is
// [/Min, /System/NodeLiveness); we'll store the range
// descriptor in both /Meta2/<r1.EndKey> and in
// /Meta1/KeyMax[1].
//
// As part of iterator we'll de-duplicate this descriptor
// away by checking whether we've seen it before in meta1.
Expand All @@ -111,15 +147,34 @@ func (i *iteratorImpl) Iterate(
continue
}

if _, err := desc.KeySpan().Intersect(rspan); err != nil {
// We're past the last range descriptor that overlaps
// with the given span.
stopMetaIteration = true
break
}

// This descriptor's span intersects with our query span, so
// collect it for the callback.
descriptors = append(descriptors, desc)

if keys.InMeta1(keys.RangeMetaKey(desc.StartKey)) {
lastRangeIDInMeta1 = desc.RangeID
}
}

// Invoke fn with the current chunk (of size ~blockSize) of
// range descriptors.
return fn(descriptors...)
})
if len(descriptors) != 0 {
// Invoke fn with the current chunk (of size ~pageSize) of
// range descriptors.
if err := fn(descriptors...); err != nil {
return err
}
}
if stopMetaIteration {
return iterutil.StopIteration() // we're done here
}
return nil
}),
)
})
}
Loading