Skip to content

Commit

Permalink
Write a txn status instead of returning TransactionAbortedError
Browse files Browse the repository at this point in the history
(I was curious if we can fix #3037 (and #1989). The change introduces
some complexity and I'm not totally sure we should fix these issues.)

This commit changes EndTransaction to return nil error on txn abort so
that the we can commit a batch and update the transaction record.

Change txnSender.Send so that it convert a nil error to TransactionAbortedError
so that txn.Exec will initiate a restart. This change is hacky, but I couldn't
come up with a better approach.

We still return TransactionAbortedError in other places and keep the
code path for handling TransactionAbortedError.
  • Loading branch information
kkaneda committed Apr 6, 2016
1 parent 17fb1bf commit d33847d
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 23 deletions.
40 changes: 30 additions & 10 deletions client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,26 +76,46 @@ func (ts *txnSender) Send(ctx context.Context, ba roachpb.BatchRequest) (*roachp
// The exception is if our transaction was aborted and needs to restart
// from scratch, in which case we do just that.
if pErr == nil {
// Return a TransactionAbortedError if the transaction is aborted but
// the request is not requesting a rollback, so that txn.Exec will
// initiate a restart.
rollback := false
for _, union := range ba.Requests {
if args, ok := union.GetInner().(*roachpb.EndTransactionRequest); ok {
rollback = !args.Commit
break
}
}
if br.Txn != nil && br.Txn.Status == roachpb.ABORTED && !rollback {
ts.resetTxnOnAbort(br.Txn)
return nil, roachpb.NewErrorWithTxn(
roachpb.NewTransactionAbortedError(), br.Txn)
}
ts.Proto.Update(br.Txn)
return br, nil
} else if _, ok := pErr.GetDetail().(*roachpb.TransactionAbortedError); ok {
// On Abort, reset the transaction so we start anew on restart.
ts.Proto = roachpb.Transaction{
TxnMeta: roachpb.TxnMeta{
Isolation: ts.Proto.Isolation,
},
Name: ts.Proto.Name,
}
// Acts as a minimum priority on restart.
if pErr.GetTxn() != nil {
ts.Proto.Priority = pErr.GetTxn().Priority
}
ts.resetTxnOnAbort(pErr.GetTxn())
} else if pErr.TransactionRestart != roachpb.TransactionRestart_NONE {
ts.Proto.Update(pErr.GetTxn())
}
return nil, pErr
}

// resetTxnOnAbort resets the transaction on abort so we start anew on restart.
func (ts *txnSender) resetTxnOnAbort(txn *roachpb.Transaction) {
ts.Proto = roachpb.Transaction{
TxnMeta: roachpb.TxnMeta{
Isolation: ts.Proto.Isolation,
},
Name: ts.Proto.Name,
}
// Acts as a minimum priority on restart.
if txn != nil {
ts.Proto.Priority = txn.Priority
}
}

// Txn is an in-progress distributed database transaction. A Txn is not safe for
// concurrent use by multiple goroutines.
type Txn struct {
Expand Down
7 changes: 6 additions & 1 deletion kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,12 @@ func (tc *TxnCoordSender) updateState(
case *roachpb.WriteTooOldError:
newTxn.Restart(ba.UserPriority, newTxn.Priority, t.ActualTimestamp)
case nil:
// Nothing to do here, avoid the default case.
if br.Txn != nil && br.Txn.Status == roachpb.ABORTED {
// Same as TransactionAbortedError.
newTxn.Timestamp.Forward(br.Txn.Timestamp)
newTxn.Priority = br.Txn.Priority
defer tc.cleanupTxn(ctx, *newTxn)
}
default:
if pErr.GetTxn() != nil {
if pErr.CanRetry() {
Expand Down
1 change: 1 addition & 0 deletions storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1628,6 +1628,7 @@ func (r *Replica) checkIfTxnAborted(b engine.Engine, txn roachpb.Transaction) *r
if entry.Priority > newTxn.Priority {
newTxn.Priority = entry.Priority
}
// TODO(kaneda): Update the txn status to to ABORTED instead of returning an error.
return roachpb.NewErrorWithTxn(roachpb.NewTransactionAbortedError(), &newTxn)
}
return nil
Expand Down
11 changes: 9 additions & 2 deletions storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,12 @@ func (r *Replica) EndTransaction(
// and (b) not able to write on error (see #1989), we can't write
// ABORTED into the master transaction record, which remains
// PENDING, and that's pretty bad.
return reply, roachpb.AsIntents(args.IntentSpans, reply.Txn), roachpb.NewTransactionAbortedError()
//
// TODO(kaneda): We now write ABORTED into the master transaction record. Resolve local intents.
if err := engine.MVCCPutProto(batch, ms, key, roachpb.ZeroTimestamp, nil /* txn */, reply.Txn); err != nil {
return reply, nil, err
}
return reply, roachpb.AsIntents(args.IntentSpans, reply.Txn), nil
}

// Verify that we can either commit it or abort it (according
Expand All @@ -414,7 +419,9 @@ func (r *Replica) EndTransaction(
// that we know them, so we return them all for asynchronous
// resolution (we're currently not able to write on error, but
// see #1989).
return reply, roachpb.AsIntents(args.IntentSpans, reply.Txn), roachpb.NewTransactionAbortedError()
//
// TODO(kaneda): Resolve local intents.
return reply, roachpb.AsIntents(args.IntentSpans, reply.Txn), nil
} else if h.Txn.Epoch < reply.Txn.Epoch {
// TODO(tschottdorf): this leaves the Txn record (and more
// importantly, intents) dangling; we can't currently write on
Expand Down
19 changes: 14 additions & 5 deletions storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1758,7 +1758,7 @@ func TestEndTransactionDeadline(t *testing.T) {

{
txn.Sequence++
_, pErr := tc.SendWrappedWith(etHeader, &etArgs)
resp, pErr := tc.SendWrappedWith(etHeader, &etArgs)
switch i {
case 0:
// No deadline.
Expand All @@ -1767,8 +1767,12 @@ func TestEndTransactionDeadline(t *testing.T) {
}
case 1:
// Past deadline.
if _, ok := pErr.GetDetail().(*roachpb.TransactionAbortedError); !ok {
t.Errorf("expected TransactionAbortedError but got %T: %s", pErr, pErr)
if pErr != nil {
t.Error(pErr)
}
reply := resp.(*roachpb.EndTransactionResponse)
if reply.Txn.Status != roachpb.ABORTED {
t.Errorf("expected aborted transaction but got %s", reply.Txn)
}
case 2:
// Equal deadline.
Expand Down Expand Up @@ -2064,7 +2068,7 @@ func TestEndTransactionWithErrors(t *testing.T) {
}{
{roachpb.Key("a"), doesNotExist, txn.Epoch, txn.Timestamp, "does not exist"},
{roachpb.Key("a"), roachpb.COMMITTED, txn.Epoch, txn.Timestamp, "txn \"test\" id=.*: already committed"},
{roachpb.Key("b"), roachpb.ABORTED, txn.Epoch, txn.Timestamp, "txn aborted \"test\" id=.*"},
{roachpb.Key("b"), roachpb.ABORTED, txn.Epoch, txn.Timestamp, ""},
{roachpb.Key("c"), roachpb.PENDING, txn.Epoch + 1, txn.Timestamp, "txn \"test\" id=.*: epoch regression: 0"},
{roachpb.Key("d"), roachpb.PENDING, txn.Epoch, regressTS, `txn "test" id=.*: timestamp regression: 0.000000001,\d+`},
}
Expand All @@ -2089,7 +2093,12 @@ func TestEndTransactionWithErrors(t *testing.T) {
args, h := endTxnArgs(txn, true)
txn.Sequence++

if _, pErr := tc.SendWrappedWith(h, &args); !testutils.IsPError(pErr, test.expErrRegexp) {
_, pErr := tc.SendWrappedWith(h, &args)
if len(test.expErrRegexp) == 0 {
if pErr != nil {
t.Errorf("%d: unexpected error %ss", i, pErr)
}
} else if !testutils.IsPError(pErr, test.expErrRegexp) {
t.Errorf("%d: expected error:\n%s\nto match:\n%s", i, pErr, test.expErrRegexp)
} else if txn := pErr.GetTxn(); txn != nil && txn.ID == nil {
// Prevent regression of #5591.
Expand Down
1 change: 1 addition & 0 deletions storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1736,6 +1736,7 @@ func (s *Store) maybeUpdateTransaction(txn *roachpb.Transaction, now roachpb.Tim
case roachpb.COMMITTED:
return nil, roachpb.NewErrorWithTxn(roachpb.NewTransactionStatusError("already committed"), updatedTxn)
case roachpb.ABORTED:
// TODO(kaneda): Update the txn status to to ABORTED instead of returning an error.
return nil, roachpb.NewErrorWithTxn(roachpb.NewTransactionAbortedError(), updatedTxn)
}
return updatedTxn, nil
Expand Down
11 changes: 6 additions & 5 deletions storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1415,12 +1415,13 @@ func TestStoreResolveWriteIntentNoTxn(t *testing.T) {
// been aborted.
etArgs, h := endTxnArgs(pushee, true)
pushee.Sequence++
_, pErr := client.SendWrappedWith(store.testSender(), nil, h, &etArgs)
if pErr == nil {
t.Errorf("unexpected success committing transaction")
resp, pErr := client.SendWrappedWith(store.testSender(), nil, h, &etArgs)
if pErr != nil {
t.Errorf("unexpected error: %s", pErr)
}
if _, ok := pErr.GetDetail().(*roachpb.TransactionAbortedError); !ok {
t.Errorf("expected transaction aborted error; got %s", pErr)
reply := resp.(*roachpb.EndTransactionResponse)
if reply.Txn.Status != roachpb.ABORTED {
t.Errorf("expected aborted transaction but got %s", reply.Txn)
}
}

Expand Down

0 comments on commit d33847d

Please sign in to comment.