diff --git a/api/v1alpha1/dataset.go b/api/v1alpha1/dataset.go index f073fba11..abe51d825 100644 --- a/api/v1alpha1/dataset.go +++ b/api/v1alpha1/dataset.go @@ -29,5 +29,6 @@ var ( // LabelDatasetScene defines the content type of this dataset LabelDatasetContentType = Group + "/content-type" // LabelDatasetBestCase defines the best case to use this dataset - LabelDatasetBestCase = Group + "/best-case" + LabelDatasetBestCase = Group + "/best-case" + LabelDatasetFinalizer = Group + "/finalizers" ) diff --git a/api/v1alpha1/versioneddataset.go b/api/v1alpha1/versioneddataset.go index 7520ecbb9..92ffeb58c 100644 --- a/api/v1alpha1/versioneddataset.go +++ b/api/v1alpha1/versioneddataset.go @@ -16,6 +16,147 @@ limitations under the License. package v1alpha1 +import ( + "context" + "fmt" + "sort" +) + var ( - LabelVersionedDatasetVersion = Group + "/version" + LabelVersionedDatasetVersion = Group + "/version" + LabelVersionedDatasetVersionOwner = Group + "/owner" ) + +// CopyedFileGroup2Status the function will eventually return, whether there are new files added. and a list of files that were deleted. +func CopyedFileGroup2Status(instance *VersionedDataset) (bool, []DatasourceFileStatus) { + if instance.DeletionTimestamp != nil { + source := instance.Status.DatasourceFiles + instance.Status.DatasourceFiles = nil + return true, source + } + + // 1. First store the information about the status of the file that has been saved in the current status. + oldDatasourceFiles := make(map[string]map[string]FileDetails) + for _, fileStatus := range instance.Status.DatasourceFiles { + key := fmt.Sprintf("%s %s", fileStatus.DatasourceNamespace, fileStatus.DatasourceName) + if _, ok := oldDatasourceFiles[key]; !ok { + oldDatasourceFiles[key] = make(map[string]FileDetails) + } + for _, item := range fileStatus.Status { + oldDatasourceFiles[key][item.Path] = item + } + } + + // 2. Organize the contents of the fileGroup into this format: {"datasourceNamespace datasourceName": ["file1", "file2"]} + fileGroup := make(map[string][]string) + for _, fg := range instance.Spec.FileGroups { + namespace := fg.Datasource.GetNamespace() + key := fmt.Sprintf("%s %s", namespace, fg.Datasource.Name) + if _, ok := fileGroup[key]; !ok { + fileGroup[key] = make([]string, 0) + } + fileGroup[key] = append(fileGroup[key], fg.Paths...) + } + + // 3. Convert fileGroup to []DatasourceFileStatus format + targetDatasourceFileStatus := make([]DatasourceFileStatus, 0) + var namespace, name string + for datasource, filePaths := range fileGroup { + _, _ = fmt.Sscanf(datasource, "%s %s", &namespace, &name) + item := DatasourceFileStatus{ + DatasourceName: name, + DatasourceNamespace: namespace, + Status: []FileDetails{}, + } + for _, fp := range filePaths { + item.Status = append(item.Status, FileDetails{ + Path: fp, + Phase: FileProcessPhaseProcessing, + }) + } + sort.Slice(item.Status, func(i, j int) bool { + return item.Status[i].Path < item.Status[j].Path + }) + + targetDatasourceFileStatus = append(targetDatasourceFileStatus, item) + } + + // 4. If a file from a data source is found to exist in oldDatasourceFiles, + // replace it with the book inside oldDatasourceFiles. + // Otherwise set the file as being processed. + update := false + deletedFiles := make([]DatasourceFileStatus, 0) + for idx := range targetDatasourceFileStatus { + item := targetDatasourceFileStatus[idx] + key := fmt.Sprintf("%s %s", item.DatasourceNamespace, item.DatasourceName) + + // if the datasource itself is not in status, then it is a new series of files added. + datasourceFiles, ok := oldDatasourceFiles[key] + if !ok { + update = true + continue + } + + // We need to check if the file under spec has existed in status, if so, how to update its status, otherwise it is a new file. + for i, status := range item.Status { + oldFileStatus, ok := datasourceFiles[status.Path] + if !ok { + update = true + continue + } + item.Status[i] = oldFileStatus + + // do the deletion here and the last data that still exists in the map then is the file that needs to be deleted. + delete(datasourceFiles, status.Path) + } + if len(datasourceFiles) > 0 { + ds := DatasourceFileStatus{ + DatasourceName: item.DatasourceName, + DatasourceNamespace: item.DatasourceNamespace, + Status: make([]FileDetails, 0), + } + for _, r := range datasourceFiles { + ds.Status = append(ds.Status, r) + } + deletedFiles = append(deletedFiles, ds) + } + targetDatasourceFileStatus[idx] = item + } + + sort.Slice(targetDatasourceFileStatus, func(i, j int) bool { + return targetDatasourceFileStatus[i].DatasourceName < targetDatasourceFileStatus[j].DatasourceName + }) + + instance.Status.DatasourceFiles = targetDatasourceFileStatus + return update, deletedFiles +} + +func UpdateFileStatus(ctx context.Context, instance *VersionedDataset, datasource, srcPath string, syncStatus FileProcessPhase, errMsg string) error { + datasourceFileLen := len(instance.Status.DatasourceFiles) + datasourceIndex := sort.Search(datasourceFileLen, func(i int) bool { + return instance.Status.DatasourceFiles[i].DatasourceName >= datasource + }) + if datasourceIndex == datasourceFileLen { + return fmt.Errorf("not found datasource %s in %s/%s.status", datasource, instance.Namespace, instance.Name) + } + + filePathLen := len(instance.Status.DatasourceFiles[datasourceIndex].Status) + fileIndex := sort.Search(filePathLen, func(i int) bool { + return instance.Status.DatasourceFiles[datasourceIndex].Status[i].Path >= srcPath + }) + if fileIndex == filePathLen { + return fmt.Errorf("not found srcPath %s in datasource %s", srcPath, datasource) + } + + // Only this state transfer is allowed + curPhase := instance.Status.DatasourceFiles[datasourceIndex].Status[fileIndex].Phase + if curPhase == FileProcessPhaseProcessing && (syncStatus == FileProcessPhaseSucceeded || syncStatus == FileProcessPhaseFailed) { + instance.Status.DatasourceFiles[datasourceIndex].Status[fileIndex].Phase = syncStatus + if syncStatus == FileProcessPhaseFailed { + instance.Status.DatasourceFiles[datasourceIndex].Status[fileIndex].ErrMessage = errMsg + } + return nil + } + + return fmt.Errorf("wrong state. from %s to %s", curPhase, syncStatus) +} diff --git a/api/v1alpha1/versioneddataset_types.go b/api/v1alpha1/versioneddataset_types.go index 834233b5a..78c12d7b4 100644 --- a/api/v1alpha1/versioneddataset_types.go +++ b/api/v1alpha1/versioneddataset_types.go @@ -41,12 +41,25 @@ type VersionedDatasetSpec struct { // FileGroups included in this `VersionedDataset` // Grouped by Datasource FileGroups []FileGroup `json:"fileGroups,omitempty"` + + // +kubebuilder:validation:Enum=0;1 + // +kubebuilder:default=0 + Released uint8 `json:"released"` +} + +type DatasourceFileStatus struct { + DatasourceName string `json:"datasourceName"` + DatasourceNamespace string `json:"datasourceNamespace"` + Status []FileDetails `json:"status,omitempty"` } // VersionedDatasetStatus defines the observed state of VersionedDataset type VersionedDatasetStatus struct { // ConditionedStatus is the current status ConditionedStatus `json:",inline"` + + // DatasourceFiles record the process and results of file processing for each data source + DatasourceFiles []DatasourceFileStatus `json:"datasourceFiles,omitempty"` } //+kubebuilder:object:root=true diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 25580d365..f59aa6f0c 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -214,6 +214,28 @@ func (in *Datasource) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DatasourceFileStatus) DeepCopyInto(out *DatasourceFileStatus) { + *out = *in + if in.Status != nil { + in, out := &in.Status, &out.Status + *out = make([]FileDetails, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DatasourceFileStatus. +func (in *DatasourceFileStatus) DeepCopy() *DatasourceFileStatus { + if in == nil { + return nil + } + out := new(DatasourceFileStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DatasourceList) DeepCopyInto(out *DatasourceList) { *out = *in @@ -1193,6 +1215,13 @@ func (in *VersionedDatasetSpec) DeepCopy() *VersionedDatasetSpec { func (in *VersionedDatasetStatus) DeepCopyInto(out *VersionedDatasetStatus) { *out = *in in.ConditionedStatus.DeepCopyInto(&out.ConditionedStatus) + if in.DatasourceFiles != nil { + in, out := &in.DatasourceFiles, &out.DatasourceFiles + *out = make([]DatasourceFileStatus, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new VersionedDatasetStatus. diff --git a/charts/arcadia/Chart.yaml b/charts/arcadia/Chart.yaml index 6a91a0eae..e856b3798 100644 --- a/charts/arcadia/Chart.yaml +++ b/charts/arcadia/Chart.yaml @@ -2,7 +2,7 @@ apiVersion: v2 name: arcadia description: A Helm chart(KubeBB Component) for KubeAGI Arcadia type: application -version: 0.1.22 +version: 0.1.23 appVersion: "0.0.1" keywords: diff --git a/charts/arcadia/crds/arcadia.kubeagi.k8s.com.cn_versioneddatasets.yaml b/charts/arcadia/crds/arcadia.kubeagi.k8s.com.cn_versioneddatasets.yaml index 9f06a7ea6..32c7df54d 100644 --- a/charts/arcadia/crds/arcadia.kubeagi.k8s.com.cn_versioneddatasets.yaml +++ b/charts/arcadia/crds/arcadia.kubeagi.k8s.com.cn_versioneddatasets.yaml @@ -104,11 +104,18 @@ spec: - paths type: object type: array + released: + default: 0 + enum: + - 0 + - 1 + type: integer version: description: Version type: string required: - dataset + - released - version type: object status: @@ -152,6 +159,42 @@ spec: - type type: object type: array + datasourceFiles: + description: DatasourceFiles record the process and results of file + processing for each data source + items: + properties: + datasourceName: + type: string + datasourceNamespace: + type: string + status: + items: + properties: + checksum: + description: Checksum defines the checksum of the file + type: string + errMessage: + description: ErrMessage defines the error message + type: string + lastUpdateTime: + description: The last time this condition was updated. + format: date-time + type: string + path: + description: Path defines the detail path to get objects + from above datasource + type: string + phase: + description: Phase defines the process phase + type: string + type: object + type: array + required: + - datasourceName + - datasourceNamespace + type: object + type: array type: object type: object served: true diff --git a/config/crd/bases/arcadia.kubeagi.k8s.com.cn_versioneddatasets.yaml b/config/crd/bases/arcadia.kubeagi.k8s.com.cn_versioneddatasets.yaml index 9f06a7ea6..32c7df54d 100644 --- a/config/crd/bases/arcadia.kubeagi.k8s.com.cn_versioneddatasets.yaml +++ b/config/crd/bases/arcadia.kubeagi.k8s.com.cn_versioneddatasets.yaml @@ -104,11 +104,18 @@ spec: - paths type: object type: array + released: + default: 0 + enum: + - 0 + - 1 + type: integer version: description: Version type: string required: - dataset + - released - version type: object status: @@ -152,6 +159,42 @@ spec: - type type: object type: array + datasourceFiles: + description: DatasourceFiles record the process and results of file + processing for each data source + items: + properties: + datasourceName: + type: string + datasourceNamespace: + type: string + status: + items: + properties: + checksum: + description: Checksum defines the checksum of the file + type: string + errMessage: + description: ErrMessage defines the error message + type: string + lastUpdateTime: + description: The last time this condition was updated. + format: date-time + type: string + path: + description: Path defines the detail path to get objects + from above datasource + type: string + phase: + description: Phase defines the process phase + type: string + type: object + type: array + required: + - datasourceName + - datasourceNamespace + type: object + type: array type: object type: object served: true diff --git a/controllers/dataset_controller.go b/controllers/dataset_controller.go index e477eb459..9ab0fae9c 100644 --- a/controllers/dataset_controller.go +++ b/controllers/dataset_controller.go @@ -19,12 +19,15 @@ package controllers import ( "context" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/reconcile" arcadiav1alpha1 "github.com/kubeagi/arcadia/api/v1alpha1" + "github.com/kubeagi/arcadia/pkg/utils" ) // DatasetReconciler reconciles a Dataset object @@ -49,9 +52,48 @@ type DatasetReconciler struct { func (r *DatasetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { _ = log.FromContext(ctx) - // TODO(user): your logic here + var err error + instance := &arcadiav1alpha1.Dataset{} + if err = r.Client.Get(ctx, req.NamespacedName, instance); err != nil { + if errors.IsNotFound(err) { + return reconcile.Result{}, nil + } + return reconcile.Result{}, err + } + if instance.DeletionTimestamp != nil { + if err := r.Client.DeleteAllOf(ctx, &arcadiav1alpha1.VersionedDataset{}, client.InNamespace(instance.Namespace), client.MatchingLabels{ + arcadiav1alpha1.LabelVersionedDatasetVersionOwner: instance.Name, + }); err != nil { + return reconcile.Result{}, err + } + instance.Finalizers = utils.RemoveString(instance.Finalizers, arcadiav1alpha1.Finalizer) + err = r.Client.Update(ctx, instance) + return reconcile.Result{}, err + } - return ctrl.Result{}, nil + if instance.Labels == nil { + instance.Labels = make(map[string]string) + } + + update := false + if v, ok := instance.Labels[arcadiav1alpha1.LabelDatasetContentType]; !ok || v != instance.Spec.ContentType { + instance.Labels[arcadiav1alpha1.LabelDatasetContentType] = instance.Spec.ContentType + update = true + } + if v, ok := instance.Labels[arcadiav1alpha1.LabelDatasetBestCase]; !ok || v != instance.Spec.BestCase { + instance.Labels[arcadiav1alpha1.LabelDatasetBestCase] = instance.Spec.BestCase + update = true + } + if !utils.ContainString(instance.Finalizers, arcadiav1alpha1.Finalizer) { + instance.Finalizers = utils.AddString(instance.Finalizers, arcadiav1alpha1.Finalizer) + update = true + } + if update { + err = r.Client.Update(ctx, instance) + return reconcile.Result{Requeue: true}, err + } + + return ctrl.Result{}, err } // SetupWithManager sets up the controller with the Manager. diff --git a/controllers/versioneddataset_controller.go b/controllers/versioneddataset_controller.go index ffeebc2d8..f7653b952 100644 --- a/controllers/versioneddataset_controller.go +++ b/controllers/versioneddataset_controller.go @@ -18,19 +18,30 @@ package controllers import ( "context" + "fmt" + "sync" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/reconcile" - arcadiav1alpha1 "github.com/kubeagi/arcadia/api/v1alpha1" + "github.com/kubeagi/arcadia/api/v1alpha1" + "github.com/kubeagi/arcadia/pkg/scheduler" + "github.com/kubeagi/arcadia/pkg/utils" ) // VersionedDatasetReconciler reconciles a VersionedDataset object type VersionedDatasetReconciler struct { client.Client Scheme *runtime.Scheme + + cache sync.Map } //+kubebuilder:rbac:groups=arcadia.kubeagi.k8s.com.cn,resources=versioneddatasets,verbs=get;list;watch;create;update;patch;delete @@ -49,7 +60,40 @@ type VersionedDatasetReconciler struct { func (r *VersionedDatasetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { _ = log.FromContext(ctx) - // TODO(user): your logic here + var err error + + instance := &v1alpha1.VersionedDataset{} + if err = r.Client.Get(ctx, req.NamespacedName, instance); err != nil { + if errors.IsNotFound(err) { + return reconcile.Result{}, nil + } + klog.Errorf("reconcile: failed to get versionDataset with req: %v", req.NamespacedName) + return reconcile.Result{}, err + } + updatedObj, err := r.preUpdate(ctx, instance) + if err != nil { + return reconcile.Result{}, err + } + + if updatedObj { + return reconcile.Result{Requeue: true}, r.Client.Update(ctx, instance) + } + + key := fmt.Sprintf("%s/%s", instance.Namespace, instance.Name) + v, ok := r.cache.Load(key) + if ok { + v.(*scheduler.Scheduler).Stop() + } + s, err := scheduler.NewScheduler(ctx, r.Client, instance) + if err != nil { + return reconcile.Result{}, err + } + r.cache.Store(key, s) + + klog.V(4).Infof("[Debug] start to sync files for %s/%s", instance.Namespace, instance.Name) + go func() { + _ = s.Start() + }() return ctrl.Result{}, nil } @@ -57,6 +101,52 @@ func (r *VersionedDatasetReconciler) Reconcile(ctx context.Context, req ctrl.Req // SetupWithManager sets up the controller with the Manager. func (r *VersionedDatasetReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). - For(&arcadiav1alpha1.VersionedDataset{}). + For(&v1alpha1.VersionedDataset{}). Complete(r) } + +func (r *VersionedDatasetReconciler) preUpdate(ctx context.Context, instance *v1alpha1.VersionedDataset) (bool, error) { + var err error + update := false + if instance.Labels == nil { + instance.Labels = make(map[string]string) + } + if v, ok := instance.Labels[v1alpha1.LabelVersionedDatasetVersion]; !ok || v != instance.Spec.Version { + instance.Labels[v1alpha1.LabelVersionedDatasetVersion] = instance.Spec.Version + update = true + } + if v, ok := instance.Labels[v1alpha1.LabelVersionedDatasetVersionOwner]; !ok || v != instance.Spec.Dataset.Name { + instance.Labels[v1alpha1.LabelVersionedDatasetVersionOwner] = instance.Spec.Dataset.Name + update = true + } + + if !utils.ContainString(instance.Finalizers, v1alpha1.Finalizer) { + update = true + instance.Finalizers = utils.AddString(instance.Finalizers, v1alpha1.Finalizer) + } + + dataset := &v1alpha1.Dataset{} + if err = r.Client.Get(ctx, types.NamespacedName{ + Namespace: instance.Spec.Dataset.GetNamespace(), + Name: instance.Spec.Dataset.Name}, dataset); err != nil { + klog.Errorf("preUpdate: failed to get dataset %s/%s, error %s", instance.Spec.Dataset.GetNamespace(), instance.Spec.Dataset.Name, err) + return false, err + } + + index := 0 + for index = range instance.OwnerReferences { + if instance.OwnerReferences[index].UID == dataset.UID { + break + } + } + if index == len(instance.OwnerReferences) { + if err = controllerutil.SetControllerReference(dataset, instance, r.Scheme); err != nil { + klog.Errorf("preUpdate: failed to set versionDataset %s/%s's ownerReference", instance.Namespace, instance.Name) + return false, err + } + + update = true + } + + return update, err +} diff --git a/go.mod b/go.mod index 5c179fb59..3232f99a8 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.20 require ( github.com/99designs/gqlgen v0.17.40 + github.com/KawashiroNitori/butcher/v2 v2.0.1 github.com/amikos-tech/chroma-go v0.0.0-20230901221218-d0087270239e github.com/coreos/go-oidc/v3 v3.7.0 github.com/go-logr/logr v1.2.0 diff --git a/go.sum b/go.sum index 2e7c4a427..150815a89 100644 --- a/go.sum +++ b/go.sum @@ -60,6 +60,8 @@ github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUM github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/KawashiroNitori/butcher/v2 v2.0.1 h1:yJJyf9WO5BUvJxnxWAOAXQcY9+VqwnYcLV9MAgdrbtg= +github.com/KawashiroNitori/butcher/v2 v2.0.1/go.mod h1:weH8qSjiTj6yGC956511noOaW4W6W9IW08Qhg78aVas= github.com/Masterminds/goutils v1.1.1 h1:5nUrii3FMTL5diU80unEVvNevw1nH4+ZV4DSLVJLSYI= github.com/Masterminds/semver/v3 v3.2.0 h1:3MEsd0SM6jqZojhjLWWeBY+Kcjy9i6MQAeY7YgDP83g= github.com/Masterminds/sprig/v3 v3.2.3 h1:eL2fZNezLomi0uOLqjQoN6BfsDD+fyLtgbJMAj9n6YA= diff --git a/main.go b/main.go index 8a83353f7..68da8fff3 100644 --- a/main.go +++ b/main.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" @@ -68,6 +69,7 @@ func main() { opts := zap.Options{ Development: true, } + klog.InitFlags(nil) opts.BindFlags(flag.CommandLine) flag.Parse() diff --git a/pkg/scheduler/executor.go b/pkg/scheduler/executor.go new file mode 100644 index 000000000..dee8d829e --- /dev/null +++ b/pkg/scheduler/executor.go @@ -0,0 +1,179 @@ +/* +Copyright 2023 KubeAGI. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package scheduler + +import ( + "context" + "fmt" + "strings" + + "github.com/KawashiroNitori/butcher/v2" + "github.com/minio/minio-go/v7" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/kubeagi/arcadia/api/v1alpha1" + "github.com/kubeagi/arcadia/pkg/datasource" +) + +type executor struct { + instance *v1alpha1.VersionedDataset + client client.Client + + deleteFileGroups []v1alpha1.DatasourceFileStatus +} + +const ( + maxWorkers = 5 + bufSize = 5 +) + +func newExecutor(ctx context.Context, c client.Client, instance *v1alpha1.VersionedDataset) butcher.Executor[JobPayload] { + _, deleteFileGroups := v1alpha1.CopyedFileGroup2Status(instance) + klog.V(4).Infof("[Debug] status is: %+v\ndelete filegroups: %+v\n", instance.Status.DatasourceFiles, deleteFileGroups) + klog.V(4).Infof("[Debug] client is nil: %v\n", c == nil) + return &executor{instance: instance, deleteFileGroups: deleteFileGroups, client: c} +} + +func (e *executor) generateJob(ctx context.Context, jobCh chan<- JobPayload, datasourceFiles []v1alpha1.DatasourceFileStatus, removeAction bool) error { + for _, fs := range datasourceFiles { + select { + case <-ctx.Done(): + return nil + default: + } + + ds := &v1alpha1.Datasource{} + datasourceNamespace := fs.DatasourceNamespace + datasetNamespace := e.instance.Spec.Dataset.GetNamespace() + srcBucket := "bucket-" + datasourceNamespace + dstBucket := "bucket-" + datasetNamespace + klog.V(4).Infof("[Debug] datasourceNamespace: %s, datasetNamespace: %s, srcBucket: %s, dstBucket: %s", + datasourceNamespace, datasetNamespace, srcBucket, dstBucket) + + if err := e.client.Get(ctx, types.NamespacedName{Namespace: fs.DatasourceNamespace, Name: fs.DatasourceName}, ds); err != nil { + klog.Errorf("generateJob: failed to get datasource %s", err) + return err + } + + klog.V(5).Infof("[Debug] get datasource %+v\n", *ds) + + oss, err := datasource.NewOSS(ctx, e.client, ds.Spec.Enpoint) + + if err != nil { + klog.Errorf("generateJob: get oss client error %s", err) + return err + } + klog.V(4).Infof("[Debug] oss client is nil: %v", oss.Client == nil) + + bucketExists, err := oss.Client.BucketExists(ctx, dstBucket) + if err != nil { + klog.Errorf("generateJob: check for the presence of a bucket has failed %s.", err) + return err + } + if !bucketExists { + if err = oss.Client.MakeBucket(ctx, dstBucket, minio.MakeBucketOptions{}); err != nil { + klog.Errorf("generateJob: failed to create bucket %s.", dstBucket) + return err + } + } + + dst := fmt.Sprintf("%s/dataset/%s/%s", datasetNamespace, e.instance.Spec.Dataset.Name, e.instance.Spec.Version) + + for _, fp := range fs.Status { + select { + case <-ctx.Done(): + return nil + default: + } + + if !removeAction && fp.Phase != v1alpha1.FileProcessPhaseProcessing { + klog.V(4).Infof("[Bebug] copy object: %v, curPhase: %s, skip", removeAction, fp.Phase) + continue + } + + if strings.HasSuffix(fp.Path, "/") { + klog.Warningf("skip %s/%s, because it ends with /. this is not a legal object in oss.", fs.DatasourceName, fp.Path) + continue + } + + dstPath := fp.Path + if !strings.HasPrefix(fp.Path, "/") { + dstPath = "/" + fp.Path + } + + payload := JobPayload{ + Src: fp.Path, + Dst: dst + dstPath, + DatasourceName: fs.DatasourceName, + SrcBucket: srcBucket, + DstBucket: dstBucket, + Client: oss.Client, + Remove: removeAction, + } + + klog.V(4).Infof("[Debug] send %+v to jobch", payload) + jobCh <- payload + } + } + return nil +} +func (e *executor) GenerateJob(ctx context.Context, jobCh chan<- JobPayload) error { + if err := e.generateJob(ctx, jobCh, e.instance.Status.DatasourceFiles, false); err != nil { + klog.Errorf("GenerateJob: error %s", err) + return err + } + return e.generateJob(ctx, jobCh, e.deleteFileGroups, true) +} + +func (e *executor) Task(ctx context.Context, job JobPayload) error { + if !job.Remove { + klog.V(4).Infof("[Debug] copyObject task from %s/%s to %s/%s", job.SrcBucket, job.Src, job.DstBucket, job.Dst) + _, err := job.Client.CopyObject(ctx, minio.CopyDestOptions{ + Bucket: job.DstBucket, + Object: job.Dst, + }, minio.CopySrcOptions{ + Bucket: job.SrcBucket, + Object: job.Src, + }) + klog.V(4).Infof("[Debug] copy object from %s to %s, result: %s", job.Src, job.Dst, err) + return err + } + + err := job.Client.RemoveObject(ctx, job.DstBucket, job.Dst, minio.RemoveObjectOptions{}) + klog.V(4).Infof("[Debug] removeObject %s/%s result %s", job.DstBucket, job.Dst, err) + + return err +} + +func (e *executor) OnFinish(ctx context.Context, job JobPayload, err error) { + if job.Remove { + klog.V(4).Infof("[Debug] OnFinish, removeObject done, don't need to updated file status, result %s", err) + return + } + + syncStatus := v1alpha1.FileProcessPhaseSucceeded + errMsg := "" + if err != nil { + syncStatus = v1alpha1.FileProcessPhaseFailed + errMsg = err.Error() + } + klog.V(4).Infof("[Debug] change the status of file %s/%s to %s", job.DatasourceName, job.Src, syncStatus) + if err = v1alpha1.UpdateFileStatus(ctx, e.instance, job.DatasourceName, job.Src, syncStatus, errMsg); err != nil { + klog.Errorf("the job with payload %v completes, but updating the cr status fails %s.", job, err) + } +} diff --git a/pkg/scheduler/job.go b/pkg/scheduler/job.go new file mode 100644 index 000000000..9511c6b83 --- /dev/null +++ b/pkg/scheduler/job.go @@ -0,0 +1,27 @@ +/* +Copyright 2023 KubeAGI. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package scheduler + +import "github.com/minio/minio-go/v7" + +type JobPayload struct { + Src, Dst string + DatasourceName string + SrcBucket, DstBucket string + + Client *minio.Client + Remove bool +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go new file mode 100644 index 000000000..b0562c51f --- /dev/null +++ b/pkg/scheduler/scheduler.go @@ -0,0 +1,83 @@ +/* +Copyright 2023 KubeAGI. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package scheduler + +import ( + "context" + + "github.com/KawashiroNitori/butcher/v2" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/kubeagi/arcadia/api/v1alpha1" +) + +type Scheduler struct { + ctx context.Context + cancel context.CancelFunc + runner butcher.Butcher + client client.Client + + ds *v1alpha1.VersionedDataset +} + +func NewScheduler(ctx context.Context, c client.Client, instance *v1alpha1.VersionedDataset) (*Scheduler, error) { + if ctx == nil { + ctx = context.Background() + } + ctx1, cancel := context.WithCancel(ctx) + s := &Scheduler{ctx: ctx1, cancel: cancel, ds: instance, client: c} + exectuor, err := butcher.NewButcher[JobPayload](newExecutor(ctx1, c, instance), butcher.BufferSize(bufSize), butcher.MaxWorker(maxWorkers)) + if err != nil { + return nil, err + } + s.runner = exectuor + return s, nil +} + +func (s *Scheduler) Start() error { + if err := s.runner.Run(s.ctx); err != nil { + klog.Errorf("versionDataset %s/%s run failed err %s.", s.ds.Namespace, s.ds.Name, err) + return err + } + + // Only when there are no errors, the latest CR is fetched to check if the resource has changed. + ds := &v1alpha1.VersionedDataset{} + if err := s.client.Get(s.ctx, types.NamespacedName{Namespace: s.ds.Namespace, Name: s.ds.Name}, ds); err != nil { + klog.Errorf("versionDataset %s/%s get failed. err %s", s.ds.Namespace, s.ds.Name, err) + return err + } + + if ds.DeletionTimestamp != nil { + ds.Finalizers = nil + klog.Infof("versionDataset %s/%s is being deleted, so we need to update his finalizers to allow the deletion to complete smoothly", ds.Namespace, ds.Name) + return s.client.Update(s.ctx, ds) + } + + if ds.ResourceVersion == s.ds.ResourceVersion { + deepCopy := ds.DeepCopy() + deepCopy.Status.DatasourceFiles = s.ds.Status.DatasourceFiles + return s.client.Status().Patch(s.ctx, deepCopy, client.MergeFrom(ds)) + } + + klog.Infof("current resourceversion: %s, previous resourceversion: %s", ds.ResourceVersion, s.ds.ResourceVersion) + return nil +} + +func (s *Scheduler) Stop() { + s.cancel() +}