Skip to content

Commit

Permalink
perf: use write fraction when resizing pool (#1031)
Browse files Browse the repository at this point in the history
The write fraction was only used when filling the pool, and not when creating
individual sessions. Instead, the type of session that was created was determined
based on the type that was needed at that time. That type would then also be
remembered for the lifetime of the session. This is a good default when no write
fraction has been set, but it is not logical if the user has configured a write
fraction.
This change ensures that the write fraction is respected when one is set. When no
write fraction has been configured, the default behavior is retained, which means
that the type of session is determined based on the type of session that is being
created. It is however not 'sticky', which means that if the pool only contains
write sessions, and a read session is needed, the pool will return a write
session, but its type will be changed to read-only and it will not be prepared
for read/write transactions when it is returned to the pool. This improves
latency for situations where a burst of write transactions has created a lot of
write-prepared sessions in the pool that is followed by a sequence of read-only
transactions.

Co-authored-by: skuruppu <skuruppu@google.com>
  • Loading branch information
olavloite and skuruppu authored Jun 9, 2020
1 parent 9e4fbbb commit 58f773b
Show file tree
Hide file tree
Showing 5 changed files with 624 additions and 73 deletions.
292 changes: 283 additions & 9 deletions benchmark/spanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import {google} from '../protos/protos';
import {SessionPoolOptions} from '../src/session-pool';
import protobuf = google.spanner.v1;
import * as yargs from 'yargs';
import {performance} from 'perf_hooks';

let spannerMock;
const server = new grpc.Server();
const selectSql = 'SELECT 1';
const updateSql = 'UPDATE FOO SET BAR=1 WHERE BAZ=2';
Expand Down Expand Up @@ -74,6 +76,43 @@ yargs
() => runBurstReadAndWrite().then(() => console.log('Benchmark finished'))
)
.example('node $0 burstReadAndWrite')
.command(
'multipleWriteBursts',
'Benchmarks a burst of read and then write operations',
{},
() => runMultipleWriteBursts().then(() => console.log('Benchmark finished'))
)
.example('node $0 multipleWriteBursts')
.command(
'oneReadTransactionPerSecond',
'Benchmarks on avg one read tx per second',
{},
() =>
runOneReadTransactionPerSecond().then(() =>
console.log('Benchmark finished')
)
)
.example('node $0 oneReadTransactionPerSecond')
.command(
'oneWriteTransactionPerSecond',
'Benchmarks on avg one write tx per second',
{},
() =>
runOneWriteTransactionPerSecond().then(() =>
console.log('Benchmark finished')
)
)
.example('node $0 oneWriteTransactionPerSecond')
.command(
'oneReadAndOneWriteTransactionPerSecond',
'Benchmarks on avg one read and one write tx per second',
{},
() =>
runOneReadAndOneWriteTransactionPerSecond().then(() =>
console.log('Benchmark finished')
)
)
.example('node $0 oneReadAndOneWriteTransactionPerSecond')
.command(
'steadyIncrease',
'Benchmarks getting max sessions sequentially',
Expand Down Expand Up @@ -104,6 +143,30 @@ async function runBurstReadAndWrite() {
shutdown();
}

async function runMultipleWriteBursts() {
await setup();
await multipleWriteBursts();
shutdown();
}

async function runOneReadTransactionPerSecond() {
await setup();
await oneReadTransactionPerSecond();
shutdown();
}

async function runOneWriteTransactionPerSecond() {
await setup();
await oneWriteTransactionPerSecond();
shutdown();
}

async function runOneReadAndOneWriteTransactionPerSecond() {
await setup();
await oneReadAndOneWriteTransactionPerSecond();
shutdown();
}

async function runSteadyIncrease() {
await setup();
await steadyIncrease();
Expand Down Expand Up @@ -134,7 +197,7 @@ async function setup() {
const EXECUTE_SQL_MIN_TIME = 10;
const EXECUTE_SQL_RND_TIME = 10;

const spannerMock = mock.createMockSpanner(server);
spannerMock = mock.createMockSpanner(server);
mockInstanceAdmin.createMockInstanceAdmin(server);
mockDatabaseAdmin.createMockDatabaseAdmin(server);

Expand Down Expand Up @@ -232,9 +295,11 @@ async function burstRead() {
const NUM_BURST_READ = 3200;
// Value 'undefined' is used to warm up the compiler.
for (const incStep of [undefined, 1, 10, 20, 25, 30, 40, 50, 100]) {
spannerMock.resetRequests();
const database = newTestDatabase({
min: 100,
max: 400,
writes: 0.2,
incStep: incStep,
});
const pool = database.pool_ as SessionPool;
Expand Down Expand Up @@ -271,6 +336,7 @@ async function burstWrite() {
const database = newTestDatabase({
min: 100,
max: 400,
writes: 0.2,
incStep: incStep,
});
const pool = database.pool_ as SessionPool;
Expand Down Expand Up @@ -309,6 +375,7 @@ async function burstReadAndWrite() {
const database = newTestDatabase({
min: 100,
max: 400,
writes: 0.2,
incStep: incStep,
});
const pool = database.pool_ as SessionPool;
Expand Down Expand Up @@ -339,6 +406,196 @@ async function burstReadAndWrite() {
}
}

async function multipleWriteBursts() {
console.log('Starting multipleWriteBursts');
const RND_WAIT_TIME_BETWEEN_REQUESTS = 10;
const NUM_BURSTS = 4;
const NUM_BURST_WRITE = 3200;
const WAIT_BETWEEN_BURSTS = 500;
// Value 'undefined' is used to warm up the compiler.
for (const incStep of [undefined, 1, 10, 20, 25, 30, 40, 50, 100]) {
const database = newTestDatabase({
min: 100,
max: 400,
writes: 0.2,
incStep: incStep,
});
const pool = database.pool_ as SessionPool;
try {
if (incStep) {
console.time(`multipleWriteBursts incStep ${incStep}`);
}
for (let i = 0; i < NUM_BURSTS; i++) {
const writePromises = queueWriteOperations(
database,
NUM_BURST_WRITE,
RND_WAIT_TIME_BETWEEN_REQUESTS
);
await Promise.all(writePromises);
await new Promise(resolve => setTimeout(resolve, WAIT_BETWEEN_BURSTS));
}
if (incStep) {
console.timeEnd(`multipleWriteBursts incStep ${incStep}`);
console.log(`Current session pool size: ${pool.size}`);
console.log(`Current num write sessions: ${pool.writes}`);
}
} finally {
await database.close();
}
}
}

async function oneReadTransactionPerSecond() {
console.log('Starting oneReadTransactionPerSecond');
const RND_WAIT_TIME_BETWEEN_REQUESTS = 100000;
const NUM_TRANSACTIONS = RND_WAIT_TIME_BETWEEN_REQUESTS / 1000;
for (const minSessions of [0, 25]) {
for (const writeFraction of [0, 0.2]) {
const database = newTestDatabase({
min: minSessions,
writes: writeFraction,
});
const pool = database.pool_ as SessionPool;
try {
// Execute a batch of write requests to initialize the session pool with only
// write sessions. The dynamic scaling of the session pool should automatically
// change this into an appropriate number of read sessions as the test runs.
await queueWriteOperations(database, pool.options.incStep!, 0);
const readPromises = queueReadOperations(
database,
NUM_TRANSACTIONS,
RND_WAIT_TIME_BETWEEN_REQUESTS,
0
);
readPromises.forEach(p =>
p.then(t => {
console.log(`Time taken: ${t}ms`);
})
);
const t = await Promise.all(readPromises);
const max = Math.max(...t);
const min = Math.min(...t);
const sum = t.reduce((a, b) => a + b, 0);
const avg = sum / t.length || 0;
const p90 = percentile(t, 0.9);
console.log(
`oneReadTransactionPerSecond, min: ${minSessions}, write: ${writeFraction}`
);
console.log(`Max: ${max}`);
console.log(`Min: ${min}`);
console.log(`Avg: ${avg}`);
console.log(`P90: ${p90}`);
console.log(`Current session pool size: ${pool.size}`);
console.log(`Current num write sessions: ${pool.writes}`);
} finally {
await database.close();
}
}
}
}

async function oneWriteTransactionPerSecond() {
console.log('Starting oneWriteTransactionPerSecond');
const RND_WAIT_TIME_BETWEEN_REQUESTS = 100000;
const NUM_TRANSACTIONS = RND_WAIT_TIME_BETWEEN_REQUESTS / 1000;
for (const minSessions of [0, 25]) {
for (const writeFraction of [0, 0.2]) {
const database = newTestDatabase({
min: minSessions,
writes: writeFraction,
});
const pool = database.pool_ as SessionPool;
try {
// Execute one read request to initialize the session pool.
await queueReadOperations(database, 1, 0, 0);
const writePromises = queueWriteOperations(
database,
NUM_TRANSACTIONS,
RND_WAIT_TIME_BETWEEN_REQUESTS
);
writePromises.forEach(p =>
p.then(t => {
console.log(`Time taken: ${t}ms`);
})
);
const t = await Promise.all(writePromises);
const max = Math.max(...t);
const min = Math.min(...t);
const sum = t.reduce((a, b) => a + b, 0);
const avg = sum / t.length || 0;
const p90 = percentile(t, 0.9);
console.log(
`oneWriteTransactionPerSecond, min: ${minSessions}, write: ${writeFraction}`
);
console.log(`Max: ${max}`);
console.log(`Min: ${min}`);
console.log(`Avg: ${avg}`);
console.log(`P90: ${p90}`);
console.log(`Current session pool size: ${pool.size}`);
console.log(`Current num write sessions: ${pool.writes}`);
} finally {
await database.close();
}
}
}
}

async function oneReadAndOneWriteTransactionPerSecond() {
console.log('Starting oneReadAndOneWriteTransactionPerSecond');
const RND_WAIT_TIME_BETWEEN_REQUESTS = 100000;
const NUM_READ_TRANSACTIONS = RND_WAIT_TIME_BETWEEN_REQUESTS / 1000;
const NUM_WRITE_TRANSACTIONS = RND_WAIT_TIME_BETWEEN_REQUESTS / 1000;
for (const minSessions of [0, 25]) {
for (const writeFraction of [0, 0.2]) {
const database = newTestDatabase({
min: minSessions,
writes: writeFraction,
});
const pool = database.pool_ as SessionPool;
try {
const readPromises = queueReadOperations(
database,
NUM_READ_TRANSACTIONS,
RND_WAIT_TIME_BETWEEN_REQUESTS,
0
);
const writePromises = queueWriteOperations(
database,
NUM_WRITE_TRANSACTIONS,
RND_WAIT_TIME_BETWEEN_REQUESTS
);
readPromises.forEach(p =>
p.then(t => {
console.log(`Read tx: ${t}ms`);
})
);
writePromises.forEach(p =>
p.then(t => {
console.log(`Write tx: ${t}ms`);
})
);
const t = await Promise.all(readPromises.concat(writePromises));
const max = Math.max(...t);
const min = Math.min(...t);
const sum = t.reduce((a, b) => a + b, 0);
const avg = sum / t.length || 0;
const p90 = percentile(t, 0.9);
console.log(
`oneReadAndOneWriteTransactionPerSecond, min: ${minSessions}, write: ${writeFraction}`
);
console.log(`Max: ${max}`);
console.log(`Min: ${min}`);
console.log(`Avg: ${avg}`);
console.log(`P90: ${p90}`);
console.log(`Current session pool size: ${pool.size}`);
console.log(`Current num write sessions: ${pool.writes}`);
} finally {
await database.close();
}
}
}
}

/**
* Executes the steadyIncrease benchmark.
*/
Expand All @@ -349,6 +606,7 @@ async function steadyIncrease() {
const database = newTestDatabase({
min: 100,
max: 400,
writes: 0.2,
incStep: incStep,
});
const pool = database.pool_ as SessionPool;
Expand Down Expand Up @@ -394,12 +652,13 @@ function queueReadOperations(
numRequests: number,
waitBetweenRequests: number,
holdSessionTime: number
): Promise<void>[] {
const promises: Promise<void>[] = [];
): Promise<number>[] {
const promises: Promise<number>[] = [];
for (let run = 0; run < numRequests; run++) {
promises.unshift(
new Promise<void>(resolve => {
new Promise<number>(resolve => {
setTimeout(async () => {
const t1 = performance.now();
let p: Promise<void>;
database
.runStream(selectSql)
Expand All @@ -412,7 +671,7 @@ function queueReadOperations(
})
.on('end', async () => {
await p;
resolve();
resolve(performance.now() - t1);
});
}, Math.random() * waitBetweenRequests);
})
Expand All @@ -435,16 +694,19 @@ function queueWriteOperations(
database: Database,
numRequests: number,
waitBetweenRequests: number
): Promise<void>[] {
const promises: Promise<void>[] = [];
): Promise<number>[] {
const promises: Promise<number>[] = [];
for (let run = 0; run < numRequests; run++) {
promises.unshift(
new Promise<void>(resolve => {
new Promise<number>(resolve => {
setTimeout(() => {
const t1 = performance.now();
database.runTransaction((err, tx) => {
tx!
.runUpdate(updateSql)
.then(() => tx!.commit().then(() => resolve()));
.then(() =>
tx!.commit().then(() => resolve(performance.now() - t1))
);
});
}, Math.random() * waitBetweenRequests);
})
Expand All @@ -471,3 +733,15 @@ function createSelect1ResultSet(): protobuf.ResultSet {
rows: [{values: [{stringValue: '1'}]}],
});
}

function percentile(arr, p) {
const sorted = arr.sort((a, b) => a - b);
const pos = (sorted.length - 1) * p;
const base = Math.floor(pos);
const rest = pos - base;
if (sorted[base + 1] !== undefined) {
return sorted[base] + rest * (sorted[base + 1] - sorted[base]);
} else {
return sorted[base];
}
}
Loading

0 comments on commit 58f773b

Please sign in to comment.