Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: Resolve intents when encountering an aborted txn #5946

Merged
merged 1 commit into from
Apr 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 94 additions & 82 deletions storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,8 +405,11 @@ func (r *Replica) EndTransaction(
// The transaction has already been aborted by other.
// Do not return TransactionAbortedError since the client anyway
// wanted to abort the transaction.
// TODO(kaneda): Resolve local intents here and add external intents to txn proto?
return reply, roachpb.AsIntents(args.IntentSpans, reply.Txn), nil
externalIntents, err := r.resolveExplicitIntents(batch, ms, args, reply.Txn)
if err != nil {
return reply, nil, err
}
return reply, externalIntents, nil
}
// If the transaction was previously aborted by a concurrent
// writer's push, any intents written are still open. It's only now
Expand Down Expand Up @@ -466,86 +469,9 @@ func (r *Replica) EndTransaction(
reply.Txn.Status = roachpb.ABORTED
}

// Resolve any explicit intents. All that are local to this range get
// resolved synchronously in the same batch. The remainder are collected
// and handed off to asynchronous processing.
desc := r.Desc()
var preMergeDesc *roachpb.RangeDescriptor
if mergeTrigger := args.InternalCommitTrigger.GetMergeTrigger(); mergeTrigger != nil {
// If this is a merge, then use the post-merge descriptor to determine
// which intents are local (note that for a split, we want to use the
// pre-split one instead because it's larger).
preMergeDesc = desc
desc = &mergeTrigger.UpdatedDesc
}

iterAndBuf := engine.GetIterAndBuf(batch)
defer iterAndBuf.Cleanup()

var externalIntents []roachpb.Intent
for _, span := range args.IntentSpans {
if err := func() error {
intent := roachpb.Intent{Span: span, Txn: reply.Txn.TxnMeta, Status: reply.Txn.Status}
if len(span.EndKey) == 0 {
// For single-key intents, do a KeyAddress-aware check of
// whether it's contained in our Range.
if !containsKey(*desc, span.Key) {
externalIntents = append(externalIntents, intent)
return nil
}
resolveMs := ms
if preMergeDesc != nil && !containsKey(*preMergeDesc, span.Key) {
// If this transaction included a merge and the intents
// are from the subsumed range, ignore the intent resolution
// stats, as they will already be accounted for during the
// merge trigger.
resolveMs = nil
}
return engine.MVCCResolveWriteIntentUsingIter(batch, iterAndBuf, resolveMs, intent)
}
// For intent ranges, cut into parts inside and outside our key
// range. Resolve locally inside, delegate the rest. In particular,
// an intent range for range-local data is correctly considered local.
inSpan, outSpans := intersectSpan(span, *desc)
for _, span := range outSpans {
outIntent := intent
outIntent.Span = span
externalIntents = append(externalIntents, outIntent)
}
if inSpan != nil {
intent.Span = *inSpan
_, err := engine.MVCCResolveWriteIntentRangeUsingIter(batch, iterAndBuf, ms, intent, 0)
return err
}
return nil
}(); err != nil {
// TODO(tschottdorf): any legitimate reason for this to happen?
// Figure that out and if not, should still be ReplicaCorruption
// and not a panic.
panic(fmt.Sprintf("error resolving intent at %s on end transaction [%s]: %s", span, reply.Txn.Status, err))
}
}

// Persist the transaction record with updated status (& possibly timestamp).
// If we've already resolved all intents locally, we actually delete the
// record right away - no use in keeping it around.
{
var err error
if txnAutoGC && len(externalIntents) == 0 {
if log.V(1) {
log.Infof("auto-gc'ed %s (%d intents)", h.Txn.ID.Short(), len(args.IntentSpans))
}
err = engine.MVCCDelete(batch, ms, key, roachpb.ZeroTimestamp, nil /* txn */)
} else {
reply.Txn.Intents = make([]roachpb.Span, len(externalIntents))
for i := range externalIntents {
reply.Txn.Intents[i] = externalIntents[i].Span
}
err = engine.MVCCPutProto(batch, ms, key, roachpb.ZeroTimestamp, nil /* txn */, reply.Txn)
}
if err != nil {
return reply, nil, err
}
externalIntents, err := r.resolveExplicitIntents(batch, ms, args, reply.Txn)
if err != nil {
return reply, nil, err
}

// Run triggers if successfully committed.
Expand Down Expand Up @@ -614,6 +540,92 @@ func (r *Replica) EndTransaction(
return reply, externalIntents, nil
}

// resolveExplicitIntents resolve any explicit intents and persists e
// transaction record with updated status. All that are local to this
// range get resolved synchronously in the same batch. The remainder
// are collected and returned so that they can be handed off to
// asynchronous processing.
func (r *Replica) resolveExplicitIntents(batch engine.Engine, ms *engine.MVCCStats, args roachpb.EndTransactionRequest, txn *roachpb.Transaction) ([]roachpb.Intent, error) {
desc := r.Desc()
var preMergeDesc *roachpb.RangeDescriptor
if mergeTrigger := args.InternalCommitTrigger.GetMergeTrigger(); mergeTrigger != nil {
// If this is a merge, then use the post-merge descriptor to determine
// which intents are local (note that for a split, we want to use the
// pre-split one instead because it's larger).
preMergeDesc = desc
desc = &mergeTrigger.UpdatedDesc
}

iterAndBuf := engine.GetIterAndBuf(batch)
defer iterAndBuf.Cleanup()

var externalIntents []roachpb.Intent
for _, span := range args.IntentSpans {
if err := func() error {
intent := roachpb.Intent{Span: span, Txn: txn.TxnMeta, Status: txn.Status}
if len(span.EndKey) == 0 {
// For single-key intents, do a KeyAddress-aware check of
// whether it's contained in our Range.
if !containsKey(*desc, span.Key) {
externalIntents = append(externalIntents, intent)
return nil
}
resolveMs := ms
if preMergeDesc != nil && !containsKey(*preMergeDesc, span.Key) {
// If this transaction included a merge and the intents
// are from the subsumed range, ignore the intent resolution
// stats, as they will already be accounted for during the
// merge trigger.
resolveMs = nil
}
return engine.MVCCResolveWriteIntentUsingIter(batch, iterAndBuf, resolveMs, intent)
}
// For intent ranges, cut into parts inside and outside our key
// range. Resolve locally inside, delegate the rest. In particular,
// an intent range for range-local data is correctly considered local.
inSpan, outSpans := intersectSpan(span, *desc)
for _, span := range outSpans {
outIntent := intent
outIntent.Span = span
externalIntents = append(externalIntents, outIntent)
}
if inSpan != nil {
intent.Span = *inSpan
_, err := engine.MVCCResolveWriteIntentRangeUsingIter(batch, iterAndBuf, ms, intent, 0)
return err
}
return nil
}(); err != nil {
// TODO(tschottdorf): any legitimate reason for this to happen?
// Figure that out and if not, should still be ReplicaCorruption
// and not a panic.
panic(fmt.Sprintf("error resolving intent at %s on end transaction [%s]: %s", span, txn.Status, err))
}
}

// Persist the transaction record with updated status (& possibly timestamp).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's weird that everything that happens from here on down happens inside of this helper method. Auto-gc'ing can easily move outside. Writing an updated txn entry can maybe stay, but the method name needs to indicate that this is happening (and nobody can write another entry in EndTransaction after this one unless it reads the entry from disk first).

// If we've already resolved all intents locally, we actually delete the
// record right away - no use in keeping it around.
key := keys.TransactionKey(txn.Key, txn.ID)
var err error
if txnAutoGC && len(externalIntents) == 0 {
if log.V(1) {
log.Infof("auto-gc'ed %s (%d intents)", txn.ID.Short(), len(args.IntentSpans))
}
err = engine.MVCCDelete(batch, ms, key, roachpb.ZeroTimestamp, nil /* txn */)
} else {
txn.Intents = make([]roachpb.Span, len(externalIntents))
for i := range externalIntents {
txn.Intents[i] = externalIntents[i].Span
}
err = engine.MVCCPutProto(batch, ms, key, roachpb.ZeroTimestamp, nil /* txn */, txn)
}
if err != nil {
return nil, err
}
return externalIntents, nil
}

// intersectSpan takes an intent and a descriptor. It then splits the
// intent's range into up to three pieces: A first piece which is contained in
// the Range, and a slice of up to two further intents which are outside of the
Expand Down
46 changes: 35 additions & 11 deletions storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2116,17 +2116,41 @@ func TestEndTransactionRollbackAbortedTransaction(t *testing.T) {
t.Fatal(pErr)
}

// Abort the transaction twice. No error is returned.
for range []int{0, 1} {
args, h := endTxnArgs(txn, false)
resp, pErr := tc.SendWrappedWith(h, &args)
if pErr != nil {
t.Error(pErr)
}
reply := resp.(*roachpb.EndTransactionResponse)
if reply.Txn.Status != roachpb.ABORTED {
t.Errorf("expected transaction status to be ABORTED; got %s", reply.Txn.Status)
}
// Abort the transaction by pushing it with a higher priority.
pusher := newTransaction("test", key, 1, roachpb.SERIALIZABLE, tc.clock)
pusher.Priority = txn.Priority + 1 // will push successfully
pushArgs := pushTxnArgs(pusher, btH.Txn, roachpb.PUSH_ABORT)
if _, pErr := tc.SendWrapped(&pushArgs); pErr != nil {
t.Fatal(pErr)
}

// Check if the intent has not yet been resolved.
var ba roachpb.BatchRequest
gArgs := getArgs(key)
ba.Add(&gArgs)
if err := ba.SetActiveTimestamp(tc.clock.Now); err != nil {
t.Fatal(err)
}
_, pErr := tc.Sender().Send(tc.rng.context(context.Background()), ba)
if _, ok := pErr.GetDetail().(*roachpb.WriteIntentError); !ok {
t.Errorf("expected write intent error, but got %s", pErr)
}

// Abort the transaction again. No error is returned.
args, h := endTxnArgs(txn, false)
args.IntentSpans = []roachpb.Span{{Key: key}}
resp, pErr := tc.SendWrappedWith(h, &args)
if pErr != nil {
t.Error(pErr)
}
reply := resp.(*roachpb.EndTransactionResponse)
if reply.Txn.Status != roachpb.ABORTED {
t.Errorf("expected transaction status to be ABORTED; got %s", reply.Txn.Status)
}

// Verify that the intent has been resolved.
if _, pErr := tc.Sender().Send(tc.rng.context(context.Background()), ba); pErr != nil {
t.Errorf("expected resolved intent, but got %s", pErr)
}
}

Expand Down