Skip to content

Commit

Permalink
feat: dataset and versioneddataset impl
Browse files Browse the repository at this point in the history
  • Loading branch information
0xff-dev committed Nov 16, 2023
1 parent b3b4673 commit 6bc9314
Show file tree
Hide file tree
Showing 15 changed files with 704 additions and 8 deletions.
3 changes: 2 additions & 1 deletion api/v1alpha1/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ var (
// LabelDatasetScene defines the content type of this dataset
LabelDatasetContentType = Group + "/content-type"
// LabelDatasetBestCase defines the best case to use this dataset
LabelDatasetBestCase = Group + "/best-case"
LabelDatasetBestCase = Group + "/best-case"
LabelDatasetFinalizer = Group + "/finalizers"
)
143 changes: 142 additions & 1 deletion api/v1alpha1/versioneddataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,147 @@ limitations under the License.

package v1alpha1

import (
"context"
"fmt"
"sort"
)

var (
LabelVersionedDatasetVersion = Group + "/version"
LabelVersionedDatasetVersion = Group + "/version"
LabelVersionedDatasetVersionOwner = Group + "/owner"
)

// CopyedFileGroup2Status the function will eventually return, whether there are new files added. and a list of files that were deleted.
func CopyedFileGroup2Status(instance *VersionedDataset) (bool, []DatasourceFileStatus) {
if instance.DeletionTimestamp != nil {
source := instance.Status.DatasourceFiles
instance.Status.DatasourceFiles = nil
return true, source
}

// 1. First store the information about the status of the file that has been saved in the current status.
oldDatasourceFiles := make(map[string]map[string]FileDetails)
for _, fileStatus := range instance.Status.DatasourceFiles {
key := fmt.Sprintf("%s %s", fileStatus.DatasourceNamespace, fileStatus.DatasourceName)
if _, ok := oldDatasourceFiles[key]; !ok {
oldDatasourceFiles[key] = make(map[string]FileDetails)
}
for _, item := range fileStatus.Status {
oldDatasourceFiles[key][item.Path] = item
}
}

// 2. Organize the contents of the fileGroup into this format: {"datasourceNamespace datasourceName": ["file1", "file2"]}
fileGroup := make(map[string][]string)
for _, fg := range instance.Spec.FileGroups {
namespace := fg.Datasource.GetNamespace()
key := fmt.Sprintf("%s %s", namespace, fg.Datasource.Name)
if _, ok := fileGroup[key]; !ok {
fileGroup[key] = make([]string, 0)
}
fileGroup[key] = append(fileGroup[key], fg.Paths...)
}

// 3. Convert fileGroup to []DatasourceFileStatus format
targetDatasourceFileStatus := make([]DatasourceFileStatus, 0)
var namespace, name string
for datasource, filePaths := range fileGroup {
_, _ = fmt.Sscanf(datasource, "%s %s", &namespace, &name)
item := DatasourceFileStatus{
DatasourceName: name,
DatasourceNamespace: namespace,
Status: []FileDetails{},
}
for _, fp := range filePaths {
item.Status = append(item.Status, FileDetails{
Path: fp,
Phase: FileProcessPhaseProcessing,
})
}
sort.Slice(item.Status, func(i, j int) bool {
return item.Status[i].Path < item.Status[j].Path
})

targetDatasourceFileStatus = append(targetDatasourceFileStatus, item)
}

// 4. If a file from a data source is found to exist in oldDatasourceFiles,
// replace it with the book inside oldDatasourceFiles.
// Otherwise set the file as being processed.
update := false
deletedFiles := make([]DatasourceFileStatus, 0)
for idx := range targetDatasourceFileStatus {
item := targetDatasourceFileStatus[idx]
key := fmt.Sprintf("%s %s", item.DatasourceNamespace, item.DatasourceName)

// if the datasource itself is not in status, then it is a new series of files added.
datasourceFiles, ok := oldDatasourceFiles[key]
if !ok {
update = true
continue
}

// We need to check if the file under spec has existed in status, if so, how to update its status, otherwise it is a new file.
for i, status := range item.Status {
oldFileStatus, ok := datasourceFiles[status.Path]
if !ok {
update = true
continue
}
item.Status[i] = oldFileStatus

// do the deletion here and the last data that still exists in the map then is the file that needs to be deleted.
delete(datasourceFiles, status.Path)
}
if len(datasourceFiles) > 0 {
ds := DatasourceFileStatus{
DatasourceName: item.DatasourceName,
DatasourceNamespace: item.DatasourceNamespace,
Status: make([]FileDetails, 0),
}
for _, r := range datasourceFiles {
ds.Status = append(ds.Status, r)
}
deletedFiles = append(deletedFiles, ds)
}
targetDatasourceFileStatus[idx] = item
}

sort.Slice(targetDatasourceFileStatus, func(i, j int) bool {
return targetDatasourceFileStatus[i].DatasourceName < targetDatasourceFileStatus[j].DatasourceName
})

instance.Status.DatasourceFiles = targetDatasourceFileStatus
return update, deletedFiles
}

func UpdateFileStatus(ctx context.Context, instance *VersionedDataset, datasource, srcPath string, syncStatus FileProcessPhase, errMsg string) error {
datasourceFileLen := len(instance.Status.DatasourceFiles)
datasourceIndex := sort.Search(datasourceFileLen, func(i int) bool {
return instance.Status.DatasourceFiles[i].DatasourceName >= datasource
})
if datasourceIndex == datasourceFileLen {
return fmt.Errorf("not found datasource %s in %s/%s.status", datasource, instance.Namespace, instance.Name)
}

filePathLen := len(instance.Status.DatasourceFiles[datasourceIndex].Status)
fileIndex := sort.Search(filePathLen, func(i int) bool {
return instance.Status.DatasourceFiles[datasourceIndex].Status[i].Path >= srcPath
})
if fileIndex == filePathLen {
return fmt.Errorf("not found srcPath %s in datasource %s", srcPath, datasource)
}

// Only this state transfer is allowed
curPhase := instance.Status.DatasourceFiles[datasourceIndex].Status[fileIndex].Phase
if curPhase == FileProcessPhaseProcessing && (syncStatus == FileProcessPhaseSucceeded || syncStatus == FileProcessPhaseFailed) {
instance.Status.DatasourceFiles[datasourceIndex].Status[fileIndex].Phase = syncStatus
if syncStatus == FileProcessPhaseFailed {
instance.Status.DatasourceFiles[datasourceIndex].Status[fileIndex].ErrMessage = errMsg
}
return nil
}

return fmt.Errorf("wrong state. from %s to %s", curPhase, syncStatus)
}
13 changes: 13 additions & 0 deletions api/v1alpha1/versioneddataset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,25 @@ type VersionedDatasetSpec struct {
// FileGroups included in this `VersionedDataset`
// Grouped by Datasource
FileGroups []FileGroup `json:"fileGroups,omitempty"`

// +kubebuilder:validation:Enum=0;1
// +kubebuilder:default=0
Released uint8 `json:"released"`
}

type DatasourceFileStatus struct {
DatasourceName string `json:"datasourceName"`
DatasourceNamespace string `json:"datasourceNamespace"`
Status []FileDetails `json:"status,omitempty"`
}

// VersionedDatasetStatus defines the observed state of VersionedDataset
type VersionedDatasetStatus struct {
// ConditionedStatus is the current status
ConditionedStatus `json:",inline"`

// DatasourceFiles record the process and results of file processing for each data source
DatasourceFiles []DatasourceFileStatus `json:"datasourceFiles,omitempty"`
}

//+kubebuilder:object:root=true
Expand Down
29 changes: 29 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion charts/arcadia/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ apiVersion: v2
name: arcadia
description: A Helm chart(KubeBB Component) for KubeAGI Arcadia
type: application
version: 0.1.22
version: 0.1.23
appVersion: "0.0.1"

keywords:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,18 @@ spec:
- paths
type: object
type: array
released:
default: 0
enum:
- 0
- 1
type: integer
version:
description: Version
type: string
required:
- dataset
- released
- version
type: object
status:
Expand Down Expand Up @@ -152,6 +159,42 @@ spec:
- type
type: object
type: array
datasourceFiles:
description: DatasourceFiles record the process and results of file
processing for each data source
items:
properties:
datasourceName:
type: string
datasourceNamespace:
type: string
status:
items:
properties:
checksum:
description: Checksum defines the checksum of the file
type: string
errMessage:
description: ErrMessage defines the error message
type: string
lastUpdateTime:
description: The last time this condition was updated.
format: date-time
type: string
path:
description: Path defines the detail path to get objects
from above datasource
type: string
phase:
description: Phase defines the process phase
type: string
type: object
type: array
required:
- datasourceName
- datasourceNamespace
type: object
type: array
type: object
type: object
served: true
Expand Down
43 changes: 43 additions & 0 deletions config/crd/bases/arcadia.kubeagi.k8s.com.cn_versioneddatasets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,18 @@ spec:
- paths
type: object
type: array
released:
default: 0
enum:
- 0
- 1
type: integer
version:
description: Version
type: string
required:
- dataset
- released
- version
type: object
status:
Expand Down Expand Up @@ -152,6 +159,42 @@ spec:
- type
type: object
type: array
datasourceFiles:
description: DatasourceFiles record the process and results of file
processing for each data source
items:
properties:
datasourceName:
type: string
datasourceNamespace:
type: string
status:
items:
properties:
checksum:
description: Checksum defines the checksum of the file
type: string
errMessage:
description: ErrMessage defines the error message
type: string
lastUpdateTime:
description: The last time this condition was updated.
format: date-time
type: string
path:
description: Path defines the detail path to get objects
from above datasource
type: string
phase:
description: Phase defines the process phase
type: string
type: object
type: array
required:
- datasourceName
- datasourceNamespace
type: object
type: array
type: object
type: object
served: true
Expand Down
Loading

0 comments on commit 6bc9314

Please sign in to comment.