Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
94438: sql,storage: add support for COL_BATCH_RESPONSE scan format r=yuzefovich a=yuzefovich

This commit introduces a new `COL_BATCH_RESPONSE` scan format for Scans
and ReverseScans which results only in needed columns to be returned
from the KV server. In other words, this commit introduces the ability
to perform the KV projection pushdown.

The main idea of this feature is to use the injected decoding logic from
SQL in order to process each KV and keep only the needed parts (i.e.
necessary SQL columns). Those needed parts are then propagated back to
the KV client as coldata.Batch'es (serialized in the Apache Arrow format).

Here is the outline of all components involved:
```
     ┌────────────────────────────────────────────────┐
     │                       SQL                      │
     │________________________________________________│
     │          colfetcher.ColBatchDirectScan         │
     │                        │                       │
     │                        ▼                       │
     │                 row.txnKVFetcher               │
     │    (behind the row.KVBatchFetcher interface)   │
     └────────────────────────────────────────────────┘
                              │
                              ▼
     ┌────────────────────────────────────────────────┐
     │                    KV Client                   │
     └────────────────────────────────────────────────┘
                              │
                              ▼
     ┌────────────────────────────────────────────────┐
     │                    KV Server                   │
     │________________________________________________│
     │           colfetcher.cFetcherWrapper           │
     │ (behind the storage.CFetcherWrapper interface) │
     │                        │                       │
     │                        ▼                       │
     │              colfetcher.cFetcher               │
     │                        │                       │
     │                        ▼                       │
     │          storage.mvccScanFetchAdapter ────────┐│
     │    (behind the storage.NextKVer interface)    ││
     │                        │                      ││
     │                        ▼                      ││
     │           storage.pebbleMVCCScanner           ││
     │ (which put's KVs into storage.singleResults) <┘│
     └────────────────────────────────────────────────┘
```
On the KV client side, `row.txnKVFetcher` issues Scans and ReverseScans with
the `COL_BATCH_RESPONSE` format and returns the response (which contains the
columnar data) to the `colfetcher.ColBatchDirectScan`.

On the KV server side, we create a `storage.CFetcherWrapper` that asks the
`colfetcher.cFetcher` for the next `coldata.Batch`. The `cFetcher`, in turn,
fetches the next KV, decodes it, and keeps only values for the needed SQL
columns, discarding the rest of the KV. The KV is emitted by the
`mvccScanFetchAdapter` which - via the `singleResults` struct - exposes
access to the current KV that the `pebbleMVCCScanner` is pointing at.

Note that there is an additional "implicit synchronization" between
components that is not shown on this diagram. In particular,
`storage.singleResults.maybeTrimPartialLastRow` must be in sync with the
`colfetcher.cFetcher` which is achieved by
- the `cFetcher` exposing access to the first key of the last incomplete SQL
  row via the `FirstKeyOfRowGetter`,
- the `singleResults` using that key as the resume key for the response,
- and the `cFetcher` removing that last partial SQL row when `NextKV()`
  returns `partialRow=true`.
This "upstream" link (although breaking the layering a bit) allows us to
avoid a performance penalty for handling the case with multiple column
families. (This case is handled by the `storage.pebbleResults` via tracking
offsets into the `pebbleResults.repr`.)

This code structure deserves some elaboration. First, there is a mismatch
between the "push" mode in which the `pebbleMVCCScanner` operates and the
"pull" mode that the `NextKVer` exposes. The adaption between two different
modes is achieved via the `mvccScanFetcherAdapter` grabbing (when the control
returns to it) the current unstable KV pair from the `singleResults` struct
which serves as a one KV pair buffer that the `pebbleMVCCScanner` `put`s into.
Second, in order be able to use the unstable KV pair without performing a
copy, the `pebbleMVCCScanner` stops at the current KV pair and returns the
control flow (which is exactly what `pebbleMVCCScanner.getOne` does) back to
the `mvccScanFetcherAdapter`, with the adapter advancing the scanner only when
the next KV pair is needed.

There are multiple scenarios which are currently not supported:
- SQL cannot issue Get requests (likely will support in 23.1)
- `TraceKV` option is not supported (likely will support in 23.1)
- user-defined types other than enums are not supported (will _not_
support in 23.1)
- non-default key locking strength as well as SKIP LOCKED wait policy
are not supported (will _not_ support in 23.1).

The usage of this feature is currently disabled by default, but I intend
to enable it by default for multi-tenant setups. The rationale is that
currently there is a large performance hit when enabling it for
single-tenant deployments whereas it offers significant speed up in the
multi-tenant world.

The microbenchmarks [show](https://gist.github.com/yuzefovich/669c295a8a4fdffa6490532284c5a719)
the expected improvement in multi-tenant setups when the tenant runs in
a separate process whenever we don't need to decode all of the columns
from the table.

The TPCH numbers, though, don't show the expected speedup:
```
Q1:	before: 11.47s	after: 8.84s	 -22.89%
Q2:	before: 0.41s	after: 0.29s	 -27.71%
Q3:	before: 7.89s	after: 9.68s	 22.63%
Q4:	before: 4.48s	after: 4.52s	 0.86%
Q5:	before: 10.39s	after: 10.35s	 -0.29%
Q6:	before: 33.57s	after: 33.41s	 -0.48%
Q7:	before: 23.82s	after: 23.81s	 -0.02%
Q8:	before: 3.78s	after: 3.76s	 -0.68%
Q9:	before: 28.15s	after: 28.03s	 -0.42%
Q10:	before: 5.00s	after: 4.98s	 -0.42%
Q11:	before: 2.44s	after: 2.44s	 0.22%
Q12:	before: 34.78s	after: 34.65s	 -0.37%
Q13:	before: 3.20s	after: 2.94s	 -8.28%
Q14:	before: 3.13s	after: 3.21s	 2.43%
Q15:	before: 16.80s	after: 16.73s	 -0.38%
Q16:	before: 1.60s	after: 1.65s	 2.96%
Q17:	before: 0.85s	after: 0.96s	 13.04%
Q18:	before: 16.39s	after: 15.47s	 -5.61%
Q19:	before: 13.76s	after: 13.01s	 -5.45%
Q20:	before: 55.33s	after: 55.12s	 -0.38%
Q21:	before: 24.31s	after: 24.31s	 -0.00%
Q22:	before: 1.28s	after: 1.41s	 10.26%
```

At the moment, `coldata.Batch` that is included into the response is
always serialized into the Arrow format, but I intend to introduce the
local fastpath to avoid that serialization. That work will be done in
a follow-up and should be able to reduce the perf hit for single-tenant
deployments.

A quick note on the TODOs sprinkled in this commit:
- `TODO(yuzefovich)` means that this will be left for 23.2 or later.
- `TODO(yuzefovich, 23.1)` means that it should be addressed in 23.1.

A quick note on testing: this commit randomizes the fact whether the new
infrastructure is used in almost all test builds. Introducing some unit
testing (say, in `storage` package) seems rather annoying since we must
create keys that are valid SQL keys (i.e. have TableID / Index ID
prefix) and need to come with the corresponding
`fetchpb.IndexFetchSpec`. Not having unit tests in the `storage` seems
ok to me given that the "meat" of the work there is still done by the
`pebbleMVCCScanner` which is exercised using the regular Scans.
End-to-end testing is well covered by all of our existing tests which
now runs randomly. I did run the CI multiple times with the new feature
enabled by default with no failure, so I hope that it shouldn't become
flaky.

Addresses: #82323.
Informs: #87610.

Epic: CRDB-14837

Release note: None

95701: gossip: Track latency by nodeID rather than addr r=kvoli,erikgrinaker a=andrewbaptist

Previously the latency to remote nodes was tracked by address rather than the node's id. This could result in a few problems. First, the remote address could be reused across nodes. This could result in incorrect information. Additionally, places that used this information (such as the allocator) needed to unnecessarily map the node id to address just to do a lookup.

Finally in preparation for dialback on heartbeat #84289 the use of the OriginAddr field in the PingRequest will change to be the actual address that a node should use to dial back. Currently this field is not set correctly.

Epic: none
Release note: None

95796: ui: add CPU Time chart do statement details r=maryliag a=maryliag

This commit adds a new chart for CPU time
on Statement Details page.

Part Of #87213

<img width="1508" alt="Screen Shot 2023-01-24 at 6 01 07 PM" src="https://user-images.githubusercontent.com/1017486/214440274-c48d3bb6-ecbe-47a2-861a-0a8407d219c4.png">


Release note (ui change): Add CPU Time chart to Statement Details page.

95832: cdc: remove 'nonsensitive' tag from changefeed description in telemetry logs r=jayshrivastava a=jayshrivastava

Previously, the description field in changefeed telemetry logs was marked as `nonsensitive`. This is incorrect because the description field may contain an SQL statement which is not safe to report. This change removes the `nonsensitive` tag so the field is redacted by default.

Fixes: #95823
Epic: none
Release note: none

95838: logictest: remove smallEngineBlocks randomization r=yuzefovich a=yuzefovich

This metamorphic randomization has caused some flakiness (due to a subset of tests taking very long time) so is now removed. This feature should be tested in a more targeted fashion.

Fixes: #95799.
Fixes: #95829

Release note: None

95840: opt: replace make with dev in test instructions r=mgartner a=mgartner

Epic: None

Release note: None

95842: roachtest: fix parameters passed to require.NoError r=yuzefovich,srosenberg,herkolategan a=renatolabs

When context is passed to an assertion, the parameters *must* be a string format, followed by arguments (as you would in a call to `fmt.Sprintf`). The previous code would panic trying to cast int to string.

Informs #95416

Release note: None

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
Co-authored-by: Andrew Baptist <baptist@cockroachlabs.com>
Co-authored-by: maryliag <marylia@cockroachlabs.com>
Co-authored-by: Jayant Shrivastava <jayants@cockroachlabs.com>
Co-authored-by: Marcus Gartner <marcus@cockroachlabs.com>
Co-authored-by: Renato Costa <renato@cockroachlabs.com>
  • Loading branch information
7 people committed Jan 25, 2023
8 parents 6fc1022 + 556cb98 + 5cf0823 + 3ce57f5 + ebd73ee + 49d834e + bea38cb + 43f1777 commit 2105cd4
Show file tree
Hide file tree
Showing 98 changed files with 2,001 additions and 649 deletions.
4 changes: 2 additions & 2 deletions docs/generated/eventlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2697,7 +2697,7 @@ was triggered.

| Field | Description | Sensitive |
|--|--|--|
| `Description` | The description of that would show up in the job's description field, redacted | no |
| `Description` | The description of that would show up in the job's description field, redacted | yes |
| `SinkType` | The type of sink being emitted to (ex: kafka, nodelocal, webhook-https). | no |
| `NumTables` | The number of tables listed in the query that the changefeed is to run on. | no |
| `Resolved` | The behavior of emitted resolved spans (ex: yes, no, 10s) | no |
Expand All @@ -2720,7 +2720,7 @@ ChangefeedFailed events.

| Field | Description | Sensitive |
|--|--|--|
| `Description` | The description of that would show up in the job's description field, redacted | no |
| `Description` | The description of that would show up in the job's description field, redacted | yes |
| `SinkType` | The type of sink being emitted to (ex: kafka, nodelocal, webhook-https). | no |
| `NumTables` | The number of tables listed in the query that the changefeed is to run on. | no |
| `Resolved` | The behavior of emitted resolved spans (ex: yes, no, 10s) | no |
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -297,4 +297,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 1000022.2-32 set the active cluster version in the format '<major>.<minor>'
version version 1000022.2-34 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,6 @@
<tr><td><div id="setting-trace-opentelemetry-collector" class="anchored"><code>trace.opentelemetry.collector</code></div></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 4317 will be used.</td></tr>
<tr><td><div id="setting-trace-span-registry-enabled" class="anchored"><code>trace.span_registry.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://&lt;ui&gt;/#/debug/tracez</td></tr>
<tr><td><div id="setting-trace-zipkin-collector" class="anchored"><code>trace.zipkin.collector</code></div></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 9411 will be used.</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000022.2-32</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000022.2-34</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td></tr>
</tbody>
</table>
22 changes: 9 additions & 13 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -545,17 +544,17 @@ func TestOracle(t *testing.T) {
leaseholder := replicas[2]

rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
setLatency := func(addr string, latency time.Duration) {
setLatency := func(id roachpb.NodeID, latency time.Duration) {
// All test cases have to have at least 11 measurement values in order for
// the exponentially-weighted moving average to work properly. See the
// comment on the WARMUP_SAMPLES const in the ewma package for details.
for i := 0; i < 11; i++ {
rpcContext.RemoteClocks.UpdateOffset(ctx, addr, rpc.RemoteOffset{}, latency)
rpcContext.RemoteClocks.UpdateOffset(ctx, id, rpc.RemoteOffset{}, latency)
}
}
setLatency("1", 100*time.Millisecond)
setLatency("2", 2*time.Millisecond)
setLatency("3", 80*time.Millisecond)
setLatency(1, 100*time.Millisecond)
setLatency(2, 2*time.Millisecond)
setLatency(3, 80*time.Millisecond)

testCases := []struct {
name string
Expand Down Expand Up @@ -700,7 +699,6 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
historicalQuery := `SELECT * FROM test AS OF SYSTEM TIME follower_read_timestamp() WHERE k=2`
recCh := make(chan tracingpb.Recording, 1)

var n2Addr, n3Addr syncutil.AtomicString
tc := testcluster.StartTestCluster(t, 4,
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
Expand All @@ -724,8 +722,8 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
// heartbeated by the time the test wants to use it. Without this
// knob, that would cause the transport to reorder replicas.
DontConsiderConnHealth: true,
LatencyFunc: func(addr string) (time.Duration, bool) {
if (addr == n2Addr.Get()) || (addr == n3Addr.Get()) {
LatencyFunc: func(id roachpb.NodeID) (time.Duration, bool) {
if (id == 2) || (id == 3) {
return time.Millisecond, true
}
return 100 * time.Millisecond, true
Expand All @@ -743,8 +741,6 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
},
})
defer tc.Stopper().Stop(ctx)
n2Addr.Set(tc.Servers[1].RPCAddr())
n3Addr.Set(tc.Servers[2].RPCAddr())

n1 := sqlutils.MakeSQLRunner(tc.Conns[0])
n1.Exec(t, `CREATE DATABASE t`)
Expand Down Expand Up @@ -887,11 +883,11 @@ func TestSecondaryTenantFollowerReadsRouting(t *testing.T) {
// For the variant where no latency information is available, we
// expect n2 to serve follower reads as well, but because it
// is in the same locality as the client.
LatencyFunc: func(addr string) (time.Duration, bool) {
LatencyFunc: func(id roachpb.NodeID) (time.Duration, bool) {
if !validLatencyFunc {
return 0, false
}
if addr == tc.Server(1).RPCAddr() {
if id == 2 {
return time.Millisecond, true
}
return 100 * time.Millisecond, true
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,10 @@ const (
// V23_1KeyVisualizerTablesAndJobs adds the system tables that support the key visualizer.
V23_1KeyVisualizerTablesAndJobs

// V23_1_KVDirectColumnarScans introduces the support of the "direct"
// columnar scans in the KV layer.
V23_1_KVDirectColumnarScans

// *************************************************
// Step (1): Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -694,6 +698,11 @@ var rawVersionsSingleton = keyedVersions{
Key: V23_1KeyVisualizerTablesAndJobs,
Version: roachpb.Version{Major: 22, Minor: 2, Internal: 32},
},
{
Key: V23_1_KVDirectColumnarScans,
Version: roachpb.Version{Major: 22, Minor: 2, Internal: 34},
},

// *************************************************
// Step (2): Add new versions here.
// Do not add new versions to a patch release.
Expand Down
7 changes: 4 additions & 3 deletions pkg/cmd/generate-logictest/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ func runExecBuildLogicTest(t *testing.T, file string) {
serverArgs := logictest.TestServerArgs{
DisableWorkmemRandomization: true,{{ if .ForceProductionValues }}
ForceProductionValues: true,{{end}}
// Disable the direct scans in order to keep the output of EXPLAIN (VEC)
// deterministic.
DisableDirectColumnarScans: true,
}
logictest.RunLogicTest(t, serverArgs, configIdx, filepath.Join(execBuildLogicTestDir, file))
}
Expand All @@ -90,12 +93,10 @@ func runSqliteLogicTest(t *testing.T, file string) {
// SQLLite logic tests can be very memory intensive, so we give them larger
// limit than other logic tests get. Also some of the 'delete' files become
// extremely slow when MVCC range tombstones are enabled for point deletes,
// so we disable that. Similarly, some of the 'view' files are too slow
// when small engine blocks are enabled.
// so we disable that.
serverArgs := logictest.TestServerArgs{
MaxSQLMemoryLimit: 512 << 20, // 512 MiB
DisableUseMVCCRangeTombstonesForPointDeletes: true,
DisableSmallEngineBlocks: true,
// Some sqlite tests with very low bytes limit value are too slow, so
// ensure 3 KiB lower bound.
BatchBytesLimitLowerBound: 3 << 10, // 3 KiB
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/tests/multitenant_distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func runMultiTenantDistSQL(
default:
// procede to report error
}
require.NoError(t, err, li, iter)
require.NoError(t, err, "instance idx = %d, iter = %d", li, iter)
iter++
}
})
Expand Down
21 changes: 19 additions & 2 deletions pkg/cmd/roachtest/tests/multitenant_tpch.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
// runMultiTenantTPCH runs TPCH queries on a cluster that is first used as a
// single-tenant deployment followed by a run of all queries in a multi-tenant
// deployment with a single SQL instance.
func runMultiTenantTPCH(ctx context.Context, t test.Test, c cluster.Cluster) {
func runMultiTenantTPCH(
ctx context.Context, t test.Test, c cluster.Cluster, enableDirectScans bool,
) {
c.Put(ctx, t.Cockroach(), "./cockroach", c.All())
c.Put(ctx, t.DeprecatedWorkload(), "./workload", c.Node(1))
c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(install.SecureOption(true)), c.All())
Expand All @@ -40,6 +42,11 @@ func runMultiTenantTPCH(ctx context.Context, t test.Test, c cluster.Cluster) {
// one at a time (using the given url as a parameter to the 'workload run'
// command). The runtimes are accumulated in the perf helper.
runTPCH := func(conn *gosql.DB, url string, setupIdx int) {
setting := fmt.Sprintf("SET CLUSTER SETTING sql.distsql.direct_columnar_scans.enabled = %t", enableDirectScans)
t.Status(setting)
if _, err := conn.Exec(setting); err != nil {
t.Fatal(err)
}
t.Status("restoring TPCH dataset for Scale Factor 1 in ", setupNames[setupIdx])
if err := loadTPCHDataset(
ctx, t, c, conn, 1 /* sf */, c.NewMonitor(ctx), c.All(), false, /* disableMergeQueue */
Expand Down Expand Up @@ -93,6 +100,16 @@ func registerMultiTenantTPCH(r registry.Registry) {
Name: "multitenant/tpch",
Owner: registry.OwnerSQLQueries,
Cluster: r.MakeClusterSpec(1 /* nodeCount */),
Run: runMultiTenantTPCH,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runMultiTenantTPCH(ctx, t, c, false /* enableDirectScans */)
},
})
r.Add(registry.TestSpec{
Name: "multitenant/tpch_direct_scans",
Owner: registry.OwnerSQLQueries,
Cluster: r.MakeClusterSpec(1 /* nodeCount */),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runMultiTenantTPCH(ctx, t, c, true /* enableDirectScans */)
},
})
}
8 changes: 8 additions & 0 deletions pkg/col/coldataext/extended_column_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ func NewExtendedColumnFactory(evalCtx *eval.Context) coldata.ColumnFactory {
return &extendedColumnFactory{evalCtx: evalCtx}
}

// NewExtendedColumnFactoryNoEvalCtx returns an extendedColumnFactory that will
// be producing coldata.DatumVecs that aren't fully initialized - the eval
// context is not set on those vectors. This can be acceptable if the caller
// cannot provide the eval.Context but also doesn't intend to compare datums.
func NewExtendedColumnFactoryNoEvalCtx() coldata.ColumnFactory {
return &extendedColumnFactory{}
}

func (cf *extendedColumnFactory) MakeColumn(t *types.T, n int) coldata.Column {
if typeconv.TypeFamilyToCanonicalTypeFamily(t.Family()) == typeconv.DatumVecCanonicalTypeFamily {
return newDatumVec(t, n, cf.evalCtx)
Expand Down
2 changes: 2 additions & 0 deletions pkg/col/colserde/arrowbatchconverter.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,8 @@ func getValueBytesAndOffsets(

// Release should be called once the converter is no longer needed so that its
// memory could be GCed.
// TODO(yuzefovich): consider renaming this to Close in order to not be confused
// with execreleasable.Releasable interface.
func (c *ArrowBatchConverter) Release(ctx context.Context) {
if c.acc != nil {
c.acc.Shrink(ctx, c.accountedFor)
Expand Down
28 changes: 10 additions & 18 deletions pkg/kv/kvclient/kvcoord/replica_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,10 @@ import (
)

// ReplicaInfo extends the Replica structure with the associated node
// descriptor.
// Locality information.
type ReplicaInfo struct {
roachpb.ReplicaDescriptor
NodeDesc *roachpb.NodeDescriptor
}

func (i ReplicaInfo) locality() []roachpb.Tier {
return i.NodeDesc.Locality.Tiers
}

func (i ReplicaInfo) addr() string {
return i.NodeDesc.Address.String()
Tiers []roachpb.Tier
}

// A ReplicaSlice is a slice of ReplicaInfo.
Expand Down Expand Up @@ -131,12 +123,12 @@ func NewReplicaSlice(
}
rs = append(rs, ReplicaInfo{
ReplicaDescriptor: r,
NodeDesc: nd,
Tiers: nd.Locality.Tiers,
})
}
if len(rs) == 0 {
return nil, newSendError(
fmt.Sprintf("no replica node addresses available via gossip for r%d", desc.RangeID))
fmt.Sprintf("no replica node information available via gossip for r%d", desc.RangeID))
}
return rs, nil
}
Expand Down Expand Up @@ -188,8 +180,8 @@ func localityMatch(a, b []roachpb.Tier) int {
}

// A LatencyFunc returns the latency from this node to a remote
// address and a bool indicating whether the latency is valid.
type LatencyFunc func(string) (time.Duration, bool)
// node and a bool indicating whether the latency is valid.
type LatencyFunc func(roachpb.NodeID) (time.Duration, bool)

// OptimizeReplicaOrder sorts the replicas in the order in which
// they're to be used for sending RPCs (meaning in the order in which
Expand Down Expand Up @@ -231,14 +223,14 @@ func (rs ReplicaSlice) OptimizeReplicaOrder(
}

if latencyFn != nil {
latencyI, okI := latencyFn(rs[i].addr())
latencyJ, okJ := latencyFn(rs[j].addr())
latencyI, okI := latencyFn(rs[i].NodeID)
latencyJ, okJ := latencyFn(rs[j].NodeID)
if okI && okJ {
return latencyI < latencyJ
}
}
attrMatchI := localityMatch(locality.Tiers, rs[i].locality())
attrMatchJ := localityMatch(locality.Tiers, rs[j].locality())
attrMatchI := localityMatch(locality.Tiers, rs[i].Tiers)
attrMatchJ := localityMatch(locality.Tiers, rs[j].Tiers)
// Longer locality matches sort first (the assumption is that
// they'll have better latencies).
return attrMatchI > attrMatchJ
Expand Down
Loading

0 comments on commit 2105cd4

Please sign in to comment.