Skip to content

Commit

Permalink
Added high-content benchmarks and fixed starvation bugs
Browse files Browse the repository at this point in the history
The bank benchmark was never doing anything before now as there
was no money in any of the accounts, so nothing would transfer.

Added high-contention workloads to the benchmarks and in the
process discovered starvation scenarios and an outright bug.

The starvation scenario was encountered when a batch containing
writes and an end transaction failed due to a serialized txn
having `Timestamp` != `OrigTimestamp`. In that event, we discard
the entire batch engine, meaning the intents for the writes are
discarded with everything else.

This change commits the results of a batch which fails with
EndTransaction.

There was also a bug when restarting a transaction. The replay
of the `BeginTransaction` statement was incorrectly using the
value of the transaction record instead of the transaction
passed in with the batch request. In the event that the txn
was pushed or aborted during the restart, this would cause
various problems when commit time comes.

The 2, 4, and 8 account benchmarks are pretty slow, but
optimizing those is the next step.
  • Loading branch information
spencerkimball committed Mar 16, 2016
1 parent 3286eee commit e6d0a16
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 31 deletions.
2 changes: 1 addition & 1 deletion client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ RetryLoop:
}
}
if txn != nil {
// TODO(andrei): don't do Cleanup() on retriable errors here.
// TODO(andrei): don't do Cleanup() on retryable errors here.
// Let the sql executor do it.
txn.Cleanup(pErr)
}
Expand Down
75 changes: 60 additions & 15 deletions sql/bank_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,18 @@ package sql_test
import (
"bytes"
"database/sql"
"flag"
"fmt"
"math/rand"
"testing"
)

var maxTransfer = flag.Int("max-transfer", 999, "Maximum amount to transfer in one transaction.")
var numAccounts = flag.Int("num-accounts", 999, "Number of accounts.")
// maxTransfer is the maximum amount to transfer in one transaction.
const maxTransfer = 999

// runBenchmarkBank mirrors the SQL performed by examples/sql_bank, but
// structured as a benchmark for easier usage of the Go performance analysis
// tools like pprof, memprof and trace.
func runBenchmarkBank(b *testing.B, db *sql.DB) {
func runBenchmarkBank(b *testing.B, db *sql.DB, numAccounts int) {
{
// Initialize the "bank" table.
schema := `
Expand All @@ -48,11 +47,11 @@ CREATE TABLE IF NOT EXISTS bench.bank (

var placeholders bytes.Buffer
var values []interface{}
for i := 0; i < *numAccounts; i++ {
for i := 0; i < numAccounts; i++ {
if i > 0 {
placeholders.WriteString(", ")
}
fmt.Fprintf(&placeholders, "($%d, 0)", i+1)
fmt.Fprintf(&placeholders, "($%d, 10000)", i+1)
values = append(values, i)
}
stmt := `INSERT INTO bench.bank (id, balance) VALUES ` + placeholders.String()
Expand All @@ -64,13 +63,13 @@ CREATE TABLE IF NOT EXISTS bench.bank (
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
from := rand.Intn(*numAccounts)
to := rand.Intn(*numAccounts - 1)
if from == to {
to = *numAccounts - 1
from := rand.Intn(numAccounts)
to := rand.Intn(numAccounts - 1)
for from == to {
to = numAccounts - 1
}

amount := rand.Intn(*maxTransfer)
amount := rand.Intn(maxTransfer)

const update = `
UPDATE bench.bank
Expand All @@ -84,10 +83,56 @@ UPDATE bench.bank
b.StopTimer()
}

func BenchmarkBank_Cockroach(b *testing.B) {
benchmarkCockroach(b, runBenchmarkBank)
func bankRunner(numAccounts int) func(*testing.B, *sql.DB) {
return func(b *testing.B, db *sql.DB) {
runBenchmarkBank(b, db, numAccounts)
}
}

func BenchmarkBank2_Cockroach(b *testing.B) {
benchmarkCockroach(b, bankRunner(2))
}

func BenchmarkBank2_Postgres(b *testing.B) {
benchmarkPostgres(b, bankRunner(2))
}

func BenchmarkBank4_Cockroach(b *testing.B) {
benchmarkCockroach(b, bankRunner(4))
}

func BenchmarkBank4_Postgres(b *testing.B) {
benchmarkPostgres(b, bankRunner(4))
}

func BenchmarkBank8_Cockroach(b *testing.B) {
benchmarkCockroach(b, bankRunner(8))
}

func BenchmarkBank8_Postgres(b *testing.B) {
benchmarkPostgres(b, bankRunner(8))
}

func BenchmarkBank16_Cockroach(b *testing.B) {
benchmarkCockroach(b, bankRunner(16))
}

func BenchmarkBank16_Postgres(b *testing.B) {
benchmarkPostgres(b, bankRunner(16))
}

func BenchmarkBank32_Cockroach(b *testing.B) {
benchmarkCockroach(b, bankRunner(32))
}

func BenchmarkBank32_Postgres(b *testing.B) {
benchmarkPostgres(b, bankRunner(32))
}

func BenchmarkBank64_Cockroach(b *testing.B) {
benchmarkCockroach(b, bankRunner(64))
}

func BenchmarkBank_Postgres(b *testing.B) {
benchmarkPostgres(b, runBenchmarkBank)
func BenchmarkBank64_Postgres(b *testing.B) {
benchmarkPostgres(b, bankRunner(64))
}
6 changes: 3 additions & 3 deletions sql/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,9 +727,9 @@ func runTxnAttempt(
return pErr
}

// execStmtsInCurrentTransaction consumes a prefix of stmts, namely the
// statements belonging to a single SQL transaction. It executes in the
// planner's transaction, which is assumed to exist.
// execStmtsInCurrentTxn consumes a prefix of stmts, namely the
// statements belonging to a single SQL transaction. It executes in
// the planner's transaction, which is assumed to exist.
//
// COMMIT/ROLLBACK statements can end the current transaction. If that happens,
// this method returns, and the remaining statements are returned.
Expand Down
1 change: 0 additions & 1 deletion storage/engine/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,6 @@ func mvccPutInternal(engine Engine, iter Iterator, ms *MVCCStats, key roachpb.Ke

if meta.Txn != nil {
// There is an uncommitted write intent.

if txn == nil || !roachpb.TxnIDEqual(meta.Txn.ID, txn.ID) {
// The current Put operation does not come from the same
// transaction.
Expand Down
22 changes: 14 additions & 8 deletions storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1507,14 +1507,20 @@ func (r *Replica) applyRaftCommandInBatch(ctx context.Context, index uint64, ori
// to continue request idempotence, even if leadership changes.
if ba.IsWrite() {
if err != nil {
// TODO(tschottdorf): make `nil` acceptable. Corresponds to
// roachpb.Response{With->Or}Error.
br = &roachpb.BatchResponse{}
// Otherwise, reset the batch to clear out partial execution and
// prepare for the failed sequence cache entry.
btch.Close()
btch = r.store.Engine().NewBatch()
*ms = engine.MVCCStats{}
// If the batch failed with a TransactionRetryError, any
// preceding mutations in the batch engine should still be
// applied so that intents are laid down in preparation for
// the retry.
if _, ok := err.GetDetail().(*roachpb.TransactionRetryError); !ok {
// TODO(tschottdorf): make `nil` acceptable. Corresponds to
// roachpb.Response{With->Or}Error.
br = &roachpb.BatchResponse{}
// Otherwise, reset the batch to clear out partial execution and
// prepare for the failed sequence cache entry.
btch.Close()
btch = r.store.Engine().NewBatch()
*ms = engine.MVCCStats{}
}
}

if ba.Txn != nil {
Expand Down
10 changes: 7 additions & 3 deletions storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,15 +315,19 @@ func (r *Replica) BeginTransaction(batch engine.Engine, ms *engine.MVCCStats, h
if ok, err := engine.MVCCGetProto(batch, key, roachpb.ZeroTimestamp, true, nil, &txn); err != nil {
return reply, err
} else if ok {
reply.Txn = &txn
// Check whether someone has come in ahead and already aborted the
// txn.
if txn.Status == roachpb.ABORTED {
return reply, roachpb.NewTransactionAbortedError()
} else if txn.Status == roachpb.PENDING && h.Txn.Epoch > txn.Epoch {
// On a transaction retry there will be an extant txn record but
// this run should have an upgraded epoch. This is a pass
// through to set the new transaction record.
// this run should have an upgraded epoch. The extant txn record
// may have been pushed or otherwise updated, so update this
// command's txn and rewrite the record.
clonedTxn := h.Txn.Clone()
h.Txn = &clonedTxn
h.Txn.Update(&txn)
reply.Txn = h.Txn // rest of the batch should use the new txn
} else {
return reply, roachpb.NewTransactionStatusError("non-aborted transaction exists already")
}
Expand Down
114 changes: 114 additions & 0 deletions storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1626,6 +1626,49 @@ func TestRangeSequenceCacheStoredTxnRetryError(t *testing.T) {
}
}

// TestTransactionRetryLeavesIntents sets up a transaction retry event
// and verifies that the intents which were written as part of a
// single batch are left in place despite the failed end transaction.
func TestTransactionRetryLeavesIntents(t *testing.T) {
defer leaktest.AfterTest(t)()
tc := testContext{}
tc.Start(t)
defer tc.Stop()

key := roachpb.Key("a")
pushee := newTransaction("test", key, 1, roachpb.SERIALIZABLE, tc.clock)
pusher := newTransaction("test", key, 1, roachpb.SERIALIZABLE, tc.clock)
pushee.Priority = 1
pusher.Priority = 2 // pusher will win

// Read from the key to increment the timestamp cache.
gArgs := getArgs(key)
if _, pErr := client.SendWrapped(tc.rng, tc.rng.context(), &gArgs); pErr != nil {
t.Fatal(pErr)
}

// Begin txn, write to key (with now-higher timestamp), and attempt to
// commit the txn, which should result in a retryable error.
btArgs, _ := beginTxnArgs(key, pushee)
pArgs := putArgs(key, []byte("foo"))
etArgs, _ := endTxnArgs(pushee, true /* commit */)
var ba roachpb.BatchRequest
ba.Header.Txn = pushee
ba.Add(&btArgs)
ba.Add(&pArgs)
ba.Add(&etArgs)
_, pErr := tc.Sender().Send(tc.rng.context(), ba)
if _, ok := pErr.GetDetail().(*roachpb.TransactionRetryError); !ok {
t.Fatalf("expected retry error; got %s", pErr)
}

// Now verify that the intent was still written for key.
_, pErr = client.SendWrapped(tc.rng, tc.rng.context(), &gArgs)
if _, ok := pErr.GetDetail().(*roachpb.WriteIntentError); !ok {
t.Fatalf("expected write intent error; got %s", pErr)
}
}

// TestRangeSequenceCacheOnlyWithIntent verifies that a transactional command
// which goes through Raft but is not a transactional write (i.e. does not
// leave intents) passes the sequence cache unhindered.
Expand Down Expand Up @@ -2818,6 +2861,77 @@ func TestPushTxnPushTimestampAlreadyPushed(t *testing.T) {
}
}

// TestPushTxnSerializableRestart simulates a transaction which is
// started at t=0, fails serializable commit due to a read at a key
// being written at t=1, is then restarted at the updated timestamp,
// but before the txn can be retried, it's pushed to t=2, an even
// higher timestamp. The test verifies that the serializable commit
// fails yet again, preventing regression of a bug in which we blindly
// overwrote the transaction record on BeginTransaction..
func TestPushTxnSerializableRestart(t *testing.T) {
defer leaktest.AfterTest(t)()
tc := testContext{}
tc.Start(t)
defer tc.Stop()

key := roachpb.Key("a")
pushee := newTransaction("test", key, 1, roachpb.SERIALIZABLE, tc.clock)
pusher := newTransaction("test", key, 1, roachpb.SERIALIZABLE, tc.clock)
pushee.Priority = 1
pusher.Priority = 2 // pusher will win

// Read from the key to increment the timestamp cache.
gArgs := getArgs(key)
if _, pErr := client.SendWrapped(tc.rng, tc.rng.context(), &gArgs); pErr != nil {
t.Fatal(pErr)
}

// Begin the pushee's transaction & write to key.
btArgs, btH := beginTxnArgs(key, pushee)
put := putArgs(key, []byte("foo"))
resp, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(), btH, &put)
if pErr != nil {
t.Fatal(pErr)
}
pushee.Update(resp.Header().Txn)

// Try to end the pushee's transaction; should get a retry failure.
etArgs, h := endTxnArgs(pushee, true /* commit */)
pushee.Sequence++
_, pErr = client.SendWrappedWith(tc.Sender(), tc.rng.context(), h, &etArgs)
if _, ok := pErr.GetDetail().(*roachpb.TransactionRetryError); !ok {
t.Fatalf("expected retry error; got %s", pErr)
}
pusheeCopy := *pushee
pushee.Restart(1, 1, pusher.Timestamp)

// Next push pushee to advance timestamp of txn record.
pusher.Timestamp = tc.rng.store.Clock().Now()
args := pushTxnArgs(pusher, &pusheeCopy, roachpb.PUSH_TIMESTAMP)
if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), &args); pErr != nil {
t.Fatal(pErr)
}

// Try to end pushed transaction at restart timestamp, which is
// earlier than its now-pushed timestamp. Should fail.
var ba roachpb.BatchRequest
pushee.Sequence++
ba.Header.Txn = pushee
ba.Add(&btArgs)
ba.Add(&put)
ba.Add(&etArgs)
_, pErr = tc.Sender().Send(tc.rng.context(), ba)
if _, ok := pErr.GetDetail().(*roachpb.TransactionRetryError); !ok {
t.Fatalf("expected retry error; got %s", pErr)
}
// Verify that the returned transaction has timestamp equal to the
// pushed timestamp (to verify that BeginTransaction found the
// pushed record and propagated it.
if txn := pErr.GetTxn(); txn.Timestamp.Equal(pusher.Timestamp) {
t.Errorf("expected batch response txn timestamp %s; got %s", pusher.Timestamp, txn.Timestamp)
}
}

// TestRangeResolveIntentRange verifies resolving a range of intents.
func TestRangeResolveIntentRange(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand Down

0 comments on commit e6d0a16

Please sign in to comment.