Skip to content
This repository has been archived by the owner on Mar 26, 2020. It is now read-only.

Commit

Permalink
Add back the old transaction/lock functions
Browse files Browse the repository at this point in the history
... so that PRs already present can be merged.
  • Loading branch information
kshlm committed May 28, 2018
1 parent da2a61c commit d7bfb63
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 3 deletions.
126 changes: 126 additions & 0 deletions glusterd2/transaction/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ import (
"errors"
"time"

"github.com/gluster/glusterd2/glusterd2/gdctx"
"github.com/gluster/glusterd2/glusterd2/store"

"github.com/coreos/etcd/clientv3/concurrency"
"github.com/pborman/uuid"
log "github.com/sirupsen/logrus"
)

const (
Expand All @@ -23,6 +26,129 @@ var (
ErrLockExists = errors.New("existing lock found for given lock ID")
)

// createLockStepFunc returns the registry IDs of StepFuncs which lock/unlock the given key.
// If existing StepFuncs are not found, new funcs are created and registered.
func createLockStepFunc(key string) (string, string, error) {
lockFuncID := key + ".Lock"
unlockFuncID := key + ".Unlock"

_, lockFuncFound := getStepFunc(lockFuncID)
_, unlockFuncFound := getStepFunc(unlockFuncID)

if lockFuncFound && unlockFuncFound {
return lockFuncID, unlockFuncID, nil
}

key = lockPrefix + key
locker := concurrency.NewMutex(store.Store.Session, key)

lockFunc := func(c TxnCtx) error {

ctx, cancel := context.WithTimeout(context.Background(), lockObtainTimeout)
defer cancel()

c.Logger().WithField("key", key).Debug("attempting to lock")
err := locker.Lock(ctx)
switch err {
case nil:
c.Logger().WithField("key", key).Debug("lock obtained")
case context.DeadlineExceeded:
// Propagate this all the way back to the client as a HTTP 409 response
c.Logger().WithField("key", key).Debug("timeout: failed to obtain lock")
err = ErrLockTimeout
}

return err
}
RegisterStepFunc(lockFunc, lockFuncID)

unlockFunc := func(c TxnCtx) error {

c.Logger().WithField("key", key).Debug("attempting to unlock")
err := locker.Unlock(context.Background())
if err == nil {
c.Logger().WithField("key", key).Debug("lock unlocked")
}

return err
}
RegisterStepFunc(unlockFunc, unlockFuncID)

return lockFuncID, unlockFuncID, nil
}

// CreateLockSteps returns a lock and an unlock Step which lock/unlock the given key
// TODO: Remove this function
func CreateLockSteps(key string) (*Step, *Step, error) {
lockFunc, unlockFunc, err := createLockStepFunc(key)
if err != nil {
return nil, nil, err
}

lockStep := &Step{lockFunc, unlockFunc, []uuid.UUID{gdctx.MyUUID}, false}
unlockStep := &Step{unlockFunc, "", []uuid.UUID{gdctx.MyUUID}, false}

return lockStep, unlockStep, nil
}

// LockUnlockFunc is signature of functions used for distributed locking
// and unlocking.
type LockUnlockFunc func(ctx context.Context) error

// CreateLockFuncs creates and returns functions for distributed lock and
// unlock. This is similar to CreateLockSteps() but returns normal functions.
// TODO: Remove this function
func CreateLockFuncs(key string) (LockUnlockFunc, LockUnlockFunc) {

key = lockPrefix + key
locker := concurrency.NewMutex(store.Store.Session, key)

// TODO: There is an opportunity for refactor here to re-use code
// between CreateLockFunc and CreateLockSteps. This variant doesn't
// have registry either.

lockFunc := func(ctx context.Context) error {
logger := gdctx.GetReqLogger(ctx)
if logger == nil {
logger = log.StandardLogger()
}

ctx, cancel := context.WithTimeout(ctx, lockObtainTimeout)
defer cancel()

logger.WithField("key", key).Debug("attempting to lock")
err := locker.Lock(ctx)
switch err {
case nil:
logger.WithField("key", key).Debug("lock obtained")
case context.DeadlineExceeded:
// Propagate this all the way back to the client as a HTTP 409 response
logger.WithField("key", key).Debug("timeout: failed to obtain lock")
err = ErrLockTimeout
}

return err
}

unlockFunc := func(ctx context.Context) error {
logger := gdctx.GetReqLogger(ctx)
if logger == nil {
logger = log.StandardLogger()
}

logger.WithField("key", key).Debug("attempting to unlock")
if err := locker.Unlock(context.Background()); err != nil {
logger.WithField("key", key).WithError(err).Error("unlock failed")
return err
}

logger.WithField("key", key).Debug("lock unlocked")
return nil
}

return lockFunc, unlockFunc
}

func (t *Txn) lock(lockID string) error {
// Ensure that no prior lock exists for the given lockID in this transaction
if _, ok := t.locks[lockID]; ok {
Expand Down
12 changes: 9 additions & 3 deletions glusterd2/transaction/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func NewTxn(ctx context.Context) *Txn {
}).WithPrefix(prefix)

t.Ctx.Logger().Debug("new transaction created")
expTxn.Add("initiated_txn_in_progress", 1)
return t
}

Expand All @@ -72,15 +71,21 @@ func NewTxnWithLocks(ctx context.Context, lockIDs ...string) (*Txn, error) {
return t, nil
}

// Cleanup cleans the leftovers after a transaction ends
// TODO: Remove this function
func (t *Txn) Cleanup() {
store.Store.Delete(context.TODO(), t.Ctx.Prefix(), clientv3.WithPrefix())
expTxn.Add("initiated_txn_in_progress", -1)
}

// Done releases any obtained locks and cleans up the transaction namespace
// Done must be called after a transaction ends
func (t *Txn) Done() {
// Release obtained locks
for _, locker := range t.locks {
locker.Unlock(context.Background())
}
store.Store.Delete(context.TODO(), t.Ctx.Prefix(), clientv3.WithPrefix())
expTxn.Add("initiated_txn_in_progress", -1)
t.Cleanup()
}

func (t *Txn) checkAlive() error {
Expand Down Expand Up @@ -111,6 +116,7 @@ func (t *Txn) Do() error {
}

t.Ctx.Logger().Debug("Starting transaction")
expTxn.Add("initiated_txn_in_progress", 1)

for i, s := range t.Steps {
if s.Skip {
Expand Down

0 comments on commit d7bfb63

Please sign in to comment.