Skip to content

Commit

Permalink
reconstruct the logic of updating dataset mountpoint (fluid-cloudnati…
Browse files Browse the repository at this point in the history
…ve#954)

* reconstruct the logic of updating dataset mountpoint

Signed-off-by: yangyuliufeng <qlw705706@gmail.com>

* reconstruct the logic of ufs map

Signed-off-by: yangyuliufeng <qlw705706@gmail.com>

* delete import

Signed-off-by: yangyuliufeng <qlw705706@gmail.com>

* Change logic of ufsToUpdate to object-oriented

Signed-off-by: yangyuliufeng <qlw705706@gmail.com>
  • Loading branch information
yangyuliufeng authored Aug 6, 2021
1 parent 38797ea commit 78e363d
Show file tree
Hide file tree
Showing 19 changed files with 564 additions and 442 deletions.
3 changes: 0 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@

# Image URL to use all building/pushing image targets
IMG ?= registry.aliyuncs.com/fluid/runtime-controller
# Produce CRDs that work back to Kubernetes 1.11 (no version conversion)
CRD_OPTIONS ?= "crd:trivialVersions=true"

Expand Down
6 changes: 2 additions & 4 deletions api/v1alpha1/dataset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,17 @@ import (
type DatasetPhase string

const (
// Bound to runtime, can't be deleted
// TODO: add the Pending phase to Dataset
PendingDatasetPhase DatasetPhase = "Pending"
// Bound to dataset, can't be released
BoundDatasetPhase DatasetPhase = "Bound"
// Failed, can't be deleted
FailedDatasetPhase DatasetPhase = "Failed"
// Not bound to runtime, can be deleted
NotBoundDatasetPhase DatasetPhase = "NotBound"
// Updated dataset, can't be released
UpdatedDatasetPhase DatasetPhase = "Updated"
// updating dataset, can't be released
UpdatingDatasetPhase DatasetPhase = "Updating"

// the dataset have no phase and need to be judged
NoneDatasetPhase DatasetPhase = ""
)

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,8 @@ github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07 h1:UyzmZLoiDWMRywV4DUY
github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA=
github.com/thecodeteam/goscaleio v0.1.0 h1:SB5tO98lawC+UK8ds/U2jyfOCH7GTcFztcF5x9gbut4=
github.com/thecodeteam/goscaleio v0.1.0/go.mod h1:68sdkZAsK8bvEwBlbQnlLS+xU+hvLYM/iQ8KXej1AwM=
github.com/thoas/go-funk v0.9.0 h1:Yzu8aTjTb1sqHZzSZLBt4qaZrFfjNizhA7IfnefjEzo=
github.com/thoas/go-funk v0.9.0/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q=
github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/timakin/bodyclose v0.0.0-20190721030226-87058b9bfcec h1:AmoEvWAO3nDx1MEcMzPh+GzOOIA5Znpv6++c7bePPY0=
Expand Down
73 changes: 4 additions & 69 deletions pkg/ddc/alluxio/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ func (e *AlluxioEngine) UpdateDatasetStatus(phase datav1alpha1.DatasetPhase) (er

switch phase {
case datav1alpha1.BoundDatasetPhase:
if len(datasetToUpdate.Status.Mounts) == 0 {
datasetToUpdate.Status.Mounts = datasetToUpdate.Spec.Mounts
}
cond = utils.NewDatasetCondition(datav1alpha1.DatasetReady, datav1alpha1.DatasetReadyReason,
"The ddc runtime is ready.",
corev1.ConditionTrue)
Expand Down Expand Up @@ -147,74 +150,6 @@ func (e *AlluxioEngine) UpdateDatasetStatus(phase datav1alpha1.DatasetPhase) (er
return
}

// // Check if it's bound to the dataset
// func (e *AlluxioEngine) IsBoundToDataset() (bound bool, err error) {
// return
// }
func (e *AlluxioEngine) UpdateMountStatus(phase datav1alpha1.DatasetPhase) (err error) {
// update the dataset status
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
dataset, err := utils.GetDataset(e.Client, e.name, e.namespace)
if err != nil {
return err
}
datasetToUpdate := dataset.DeepCopy()
var cond datav1alpha1.DatasetCondition

switch phase {
case datav1alpha1.UpdatingDatasetPhase:
cond = utils.NewDatasetCondition(datav1alpha1.DatasetUpdating, datav1alpha1.DatasetUpdatingReason,
"The ddc runtime is updating.",
corev1.ConditionTrue)
default:
cond = utils.NewDatasetCondition(datav1alpha1.DatasetReady, datav1alpha1.DatasetReadyReason,
"The ddc runtime is unknown.",
corev1.ConditionFalse)
}

datasetToUpdate.Status.Phase = phase
datasetToUpdate.Status.Conditions = utils.UpdateDatasetCondition(datasetToUpdate.Status.Conditions,
cond)

// set Status.Mounts
if phase == datav1alpha1.BoundDatasetPhase || phase == datav1alpha1.UpdatedDatasetPhase {
datasetToUpdate.Status.Mounts = datasetToUpdate.Spec.Mounts
}

e.Log.Info("the dataset status", "status", datasetToUpdate.Status)

if !reflect.DeepEqual(dataset.Status, datasetToUpdate.Status) {
err = e.Client.Status().Update(context.TODO(), datasetToUpdate)
if err != nil {
e.Log.Error(err, "Update mount status")
return err
}
}

return nil
})

if err != nil {
e.Log.Error(err, "Update mount status")
return err
}

return
}

func (e *AlluxioEngine) BindToDataset() (err error) {
err = e.UpdateDatasetStatus(datav1alpha1.BoundDatasetPhase)
if err != nil {
e.Log.Error(err, "UpdateDatasetStatus to Bound")
return err
}
return e.UpdateMountStatus(datav1alpha1.BoundDatasetPhase)
}

func (e *AlluxioEngine) SetUFSUpdated() (err error) {
return e.UpdateMountStatus(datav1alpha1.UpdatedDatasetPhase)
}

func (e *AlluxioEngine) SetUFSUpdating() (err error) {
return e.UpdateDatasetStatus(datav1alpha1.UpdatingDatasetPhase)
return e.UpdateDatasetStatus(datav1alpha1.BoundDatasetPhase)
}
91 changes: 32 additions & 59 deletions pkg/ddc/alluxio/ufs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ limitations under the License.

package alluxio

import (
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/utils"
)

// UsedStorageBytes returns used storage size of Alluxio in bytes
func (e *AlluxioEngine) UsedStorageBytes() (value int64, err error) {
// return e.usedStorageBytesInternal()
Expand Down Expand Up @@ -46,26 +51,6 @@ func (e *AlluxioEngine) ShouldCheckUFS() (should bool, err error) {
return
}

func (e *AlluxioEngine) GetUpdateUFSMap() (map[string][]string, error) {
updateUFSMap := make(map[string][]string)
// For Alluxio Engine, always attempt to prepare UFS
resultInCtx, resultHaveMounted, err := e.getMounts()

// 2. get mount point need to be added and removed
//var added, removed []string
added, removed := e.calculateMountPointsChanges(resultHaveMounted, resultInCtx)

if len(added) > 0 {
updateUFSMap["added"] = added
}

if len(removed) > 0 {
updateUFSMap["removed"] = removed
}

return updateUFSMap, err
}

// PrepareUFS does all the UFS preparations
func (e *AlluxioEngine) PrepareUFS() (err error) {
// 1. Mount UFS (Synchronous Operation)
Expand Down Expand Up @@ -93,55 +78,43 @@ func (e *AlluxioEngine) PrepareUFS() (err error) {
return
}

func (e *AlluxioEngine) UpdateUFS(updatedUFSMap map[string][]string) (err error) {
//1. set update status to updating
errUpdating := e.SetUFSUpdating()
if errUpdating != nil {
e.Log.Error(err, "Failed to update dataset status to updating")
return err
}
//2. process added and removed
err = e.processUpdatingUFS(updatedUFSMap)
func (e *AlluxioEngine) ShouldUpdateUFS() (ufsToUpdate *utils.UFSToUpdate) {
// 1. get the dataset
dataset, err := utils.GetDataset(e.Client, e.name, e.namespace)
if err != nil {
e.Log.Error(err, "Failed to add or remove mount points")
return err
e.Log.Error(err, "Failed to get the dataset")
return
}

//3. update dataset status to updated
err = e.SetUFSUpdated()
if err != nil {
e.Log.Error(err, "Failed to update dataset status to updated")
return err
}
// 2.get the ufs to update
ufsToUpdate = utils.NewUFSToUpdate(dataset)
ufsToUpdate.AnalyzePathsDelta()

return err
return
}

func (e *AlluxioEngine) UpdateOnUFSChange() (updateReady bool, err error) {
// 1.get the updated ufs map
// updatedUFSMap, err
updatedUFSMap, err := e.GetUpdateUFSMap()
func (e *AlluxioEngine) UpdateOnUFSChange(ufsToUpdate *utils.UFSToUpdate) (updateReady bool, err error) {
// 1. check if need to update ufs
if !ufsToUpdate.ShouldUpdate() {
e.Log.Info("no need to update ufs",
"namespace", e.namespace,
"name", e.name)
return
}

// 2. set update status to updating
err = utils.UpdateMountStatus(e.Client, e.name, e.namespace, datav1alpha1.UpdatingDatasetPhase)
if err != nil {
e.Log.Error(err, "Failed to check mount points changes")
e.Log.Error(err, "Failed to update dataset status to updating")
return
}

if len(updatedUFSMap) > 0 {
//2. update the ufs
err := e.UpdateUFS(updatedUFSMap)
if err != nil {
e.Log.Error(err, "Failed to add or remove mount points")
}
// 3. process added and removed
err = e.processUpdatingUFS(ufsToUpdate)
if err != nil {
e.Log.Error(err, "Failed to add or remove mount points")
return
}

updateReady = true

return updateReady, err
return
}

////du the ufs
//func (e *AlluxioEngine) du() (ufs int64, cached int64, cachedPercentage string, err error) {
// podName, containerName := e.getMasterPodInfo()
// fileUitls := operations.NewAlluxioFileUtils(podName, containerName, e.namespace, e.Log)
// return fileUitls.Du("/")
//}
Loading

0 comments on commit 78e363d

Please sign in to comment.