Skip to content
This repository has been archived by the owner on Mar 26, 2020. It is now read-only.

Commit

Permalink
transaction: Add new transaction locking methods
Browse files Browse the repository at this point in the history
  • Loading branch information
kshlm committed May 28, 2018
1 parent 0fac260 commit 2ff4f66
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 3 deletions.
58 changes: 55 additions & 3 deletions glusterd2/transaction/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ const (
lockObtainTimeout = 5 * time.Second
)

// ErrLockTimeout is the error returned when lock could not be obtained
// and the request timed out
var ErrLockTimeout = errors.New("could not obtain lock: another conflicting transaction may be in progress")
var (
// ErrLockTimeout is the error returned when lock could not be obtained
// and the request timed out
ErrLockTimeout = errors.New("could not obtain lock: another conflicting transaction may be in progress")
// ErrLockExists is returned when a lock already exists within the transaction
ErrLockExists = errors.New("existing lock found for given lock ID")
)

// createLockStepFunc returns the registry IDs of StepFuncs which lock/unlock the given key.
// If existing StepFuncs are not found, new funcs are created and registered.
Expand Down Expand Up @@ -142,3 +146,51 @@ func CreateLockFuncs(key string) (LockUnlockFunc, LockUnlockFunc) {

return lockFunc, unlockFunc
}

func (t *Txn) lock(lockID string) error {
// Ensure that no prior lock exists for the given lockID in this transaction
if _, ok := t.locks[lockID]; ok {
return ErrLockExists
}

logger := t.Ctx.Logger().WithField("lockID", lockID)
logger.Debug("attempting to obtain lock")

key := lockPrefix + lockID
locker := concurrency.NewMutex(store.Store.Session, key)

ctx, cancel := context.WithTimeout(store.Store.Ctx(), lockObtainTimeout)
defer cancel()

err := locker.Lock(ctx)
switch err {
case nil:
logger.Debug("lock obtained")
// Attach lock to the transaction
t.locks[lockID] = locker

case context.DeadlineExceeded:
// Propagate this all the way back to the client as a HTTP 409 response
logger.Debug("timeout: failed to obtain lock")
err = ErrLockTimeout

default:
logger.WithError(err).Error("failed to obtain lock")
}

return err
}

// Lock obtains a cluster wide transaction lock on the given lockID/lockIDs,
// and attaches the obtained locks to the transaction
func (t *Txn) Lock(lockID string, lockIDs ...string) error {
if err := t.lock(lockID); err != nil {
return err
}
for _, id := range lockIDs {
if err := t.lock(id); err != nil {
return err
}
}
return nil
}
31 changes: 31 additions & 0 deletions glusterd2/transaction/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/gluster/glusterd2/glusterd2/store"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/pborman/uuid"
log "github.com/sirupsen/logrus"
)
Expand All @@ -24,6 +25,8 @@ var expTxn = expvar.NewMap("txn")
type Txn struct {
id uuid.UUID
reqID uuid.UUID
locks map[string]*concurrency.Mutex

Ctx TxnCtx
Steps []*Step

Expand All @@ -39,14 +42,32 @@ type Txn struct {
// NewTxn returns an initialized Txn without any steps
func NewTxn(ctx context.Context) *Txn {
t := new(Txn)

t.id = uuid.NewRandom()
t.reqID = gdctx.GetReqID(ctx)
t.locks = make(map[string]*concurrency.Mutex)

prefix := txnPrefix + t.id.String()
t.Ctx = NewCtxWithLogFields(log.Fields{
"txnid": t.id.String(),
"reqid": t.reqID.String(),
}).WithPrefix(prefix)

t.Ctx.Logger().Debug("new transaction created")
return t
}

// NewTxnWithLocks returns an empty Txn with locks obtained on given lockIDs
func NewTxnWithLocks(ctx context.Context, lockIDs ...string) *Txn {
t := NewTxn(ctx)

for _, id := range lockIDs {
if err := t.Lock(id); err != nil {
t.Done()
return nil
}
}

return t
}

Expand All @@ -56,6 +77,16 @@ func (t *Txn) Cleanup() {
expTxn.Add("initiated_txn_in_progress", -1)
}

// Done releases any obtained locks and cleans up the transaction namespace
// Done must be called after a transaction ends
func (t *Txn) Done() {
// Release obtained locks
for _, locker := range t.locks {
locker.Unlock(context.Background())
}
t.Cleanup()
}

func (t *Txn) checkAlive() error {

if len(t.Nodes) == 0 {
Expand Down

0 comments on commit 2ff4f66

Please sign in to comment.