Skip to content

Commit

Permalink
Merge pull request #425 from yati1998/DFBUGS-906
Browse files Browse the repository at this point in the history
DFBUGS-906: Prevent dataloss due to the concurrent RPC calls (occurrence is very low)
  • Loading branch information
openshift-merge-bot[bot] authored Nov 26, 2024
2 parents e444741 + ac91de7 commit 1245510
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 13 deletions.
23 changes: 16 additions & 7 deletions internal/cephfs/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,13 @@ func (ns *NodeServer) NodePublishVolume(
targetPath := req.GetTargetPath()
volID := fsutil.VolumeID(req.GetVolumeId())

if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired {
log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath)

return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath)
}
defer ns.VolumeLocks.Release(targetPath)

volOptions := &store.VolumeOptions{}
defer volOptions.Destroy()

Expand All @@ -506,9 +513,6 @@ func (ns *NodeServer) NodePublishVolume(
return nil, status.Errorf(codes.Internal, "failed to create mounter for volume %s: %v", volID, err.Error())
}

// Considering kubelet make sure the stage and publish operations
// are serialized, we dont need any extra locking in nodePublish

if err = util.CreateMountPoint(targetPath); err != nil {
log.ErrorLog(ctx, "failed to create mount point at %s: %v", targetPath, err)

Expand Down Expand Up @@ -599,12 +603,17 @@ func (ns *NodeServer) NodeUnpublishVolume(
return nil, err
}

// considering kubelet make sure node operations like unpublish/unstage...etc can not be called
// at same time, an explicit locking at time of nodeunpublish is not required.
targetPath := req.GetTargetPath()
volID := req.GetVolumeId()
if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired {
log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath)

return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath)
}
defer ns.VolumeLocks.Release(targetPath)

// stop the health-checker that may have been started in NodeGetVolumeStats()
ns.healthChecker.StopChecker(req.GetVolumeId(), targetPath)
ns.healthChecker.StopChecker(volID, targetPath)

isMnt, err := util.IsMountPoint(ns.Mounter, targetPath)
if err != nil {
Expand All @@ -627,7 +636,7 @@ func (ns *NodeServer) NodeUnpublishVolume(
isMnt = true
}
if !isMnt {
if err = os.RemoveAll(targetPath); err != nil {
if err = os.Remove(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

Expand Down
22 changes: 16 additions & 6 deletions internal/rbd/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,8 +709,12 @@ func (ns *NodeServer) NodePublishVolume(
volID := req.GetVolumeId()
stagingPath += "/" + volID

// Considering kubelet make sure the stage and publish operations
// are serialized, we dont need any extra locking in nodePublish
if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired {
log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath)

return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath)
}
defer ns.VolumeLocks.Release(targetPath)

// Check if that target path exists properly
notMnt, err := ns.createTargetMountPath(ctx, targetPath, isBlock)
Expand Down Expand Up @@ -914,8 +918,14 @@ func (ns *NodeServer) NodeUnpublishVolume(
}

targetPath := req.GetTargetPath()
// considering kubelet make sure node operations like unpublish/unstage...etc can not be called
// at same time, an explicit locking at time of nodeunpublish is not required.

if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired {
log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath)

return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath)
}
defer ns.VolumeLocks.Release(targetPath)

isMnt, err := ns.Mounter.IsMountPoint(targetPath)
if err != nil {
if os.IsNotExist(err) {
Expand All @@ -928,7 +938,7 @@ func (ns *NodeServer) NodeUnpublishVolume(
return nil, status.Error(codes.NotFound, err.Error())
}
if !isMnt {
if err = os.RemoveAll(targetPath); err != nil {
if err = os.Remove(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

Expand All @@ -939,7 +949,7 @@ func (ns *NodeServer) NodeUnpublishVolume(
return nil, status.Error(codes.Internal, err.Error())
}

if err = os.RemoveAll(targetPath); err != nil {
if err = os.Remove(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

Expand Down
3 changes: 3 additions & 0 deletions internal/util/idlocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ const (

// SnapshotOperationAlreadyExistsFmt string format to return for concurrent operation.
SnapshotOperationAlreadyExistsFmt = "an operation with the given Snapshot ID %s already exists"

// TargetPathOperationAlreadyExistsFmt string format to return for concurrent operation on target path.
TargetPathOperationAlreadyExistsFmt = "an operation with the given target path %s already exists"
)

// VolumeLocks implements a map with atomic operations. It stores a set of all volume IDs
Expand Down

0 comments on commit 1245510

Please sign in to comment.