From d12a42ef565911f59ffcead7881f9cb2e9ecd201 Mon Sep 17 00:00:00 2001 From: 0xff-dev Date: Tue, 19 Dec 2023 15:41:40 +0800 Subject: [PATCH] fix: remove minio uploadid cache --- .../examples/upload-download-file/main.go | 23 +++++-- apiserver/pkg/common/file_chunk.go | 7 --- apiserver/service/minio_server.go | 63 +++++-------------- pkg/datasource/datasource.go | 5 ++ pkg/datasource/oss.go | 22 +++++++ 5 files changed, 62 insertions(+), 58 deletions(-) diff --git a/apiserver/examples/upload-download-file/main.go b/apiserver/examples/upload-download-file/main.go index e2609dd92..b59c02dcf 100644 --- a/apiserver/examples/upload-download-file/main.go +++ b/apiserver/examples/upload-download-file/main.go @@ -73,8 +73,8 @@ type ( MD5 string `json:"md5"` // The file is eventually stored in bucketPath/filtName in the bucket. - FileName string `json:"fileName"` Bucket string `json:"bucket"` + FileName string `json:"fileName"` BucketPath string `json:"bucketPath"` } @@ -84,6 +84,7 @@ type ( MD5 string `json:"md5"` UploadID string `json:"uploadID"` Bucket string `json:"bucket"` + FileName string `json:"fileName"` BucketPath string `json:"bucketPath"` } GenChunkURLResult struct { @@ -91,12 +92,23 @@ type ( URL string `json:"url"` } + DelteFileBody struct { + Files []string `json:"files"` + Bucket string `json:"bucket"` + BucketPath string `json:"bucketPath"` + } + CompleteBody struct { MD5 string `json:"md5"` - BucketPath string `json:"bucketPath"` + UploadID string `json:"uploadID"` Bucket string `json:"bucket"` FileName string `json:"fileName"` - UploadID string `json:"uploadID"` + BucketPath string `json:"bucketPath"` + } + + ReadCSVResp struct { + Rows [][]string `json:"rows"` + Total int64 `json:"total"` } ) @@ -188,7 +200,7 @@ func newMultipart( func genURL( md5, bucket, bucketPath, uploadID string, - partNumer int, + partNumer int, fileName string, transport http.RoundTripper) (GenChunkURLResult, error) { klog.Infof("[DEBUG] request upload url by uploadid: %s...", uploadID) @@ -199,6 +211,7 @@ func genURL( BucketPath: bucketPath, UploadID: uploadID, Size: bufSize, + FileName: fileName, } bodyBytes, _ := json.Marshal(body) @@ -295,7 +308,7 @@ func do( md5, uploadID, bucket, bucketPath, fileName string, transport http.RoundTripper) error { defer wg.Done() - urlResult, err := genURL(md5, bucket, bucketPath, uploadID, partNumber, transport) + urlResult, err := genURL(md5, bucket, bucketPath, uploadID, partNumber, fileName, transport) if err != nil { klog.Error("[do Error] failed to gen url error %s", err) return err diff --git a/apiserver/pkg/common/file_chunk.go b/apiserver/pkg/common/file_chunk.go index 0391cebdd..a8dc00e7e 100644 --- a/apiserver/pkg/common/file_chunk.go +++ b/apiserver/pkg/common/file_chunk.go @@ -40,10 +40,3 @@ const ( // putObject behaves internally as multipart. MinPartSize = 1024 * 1024 * 64 ) - -type FileChunk struct { - Md5 string - UploadID string - FileName string - Bucket, BucketPath string -} diff --git a/apiserver/service/minio_server.go b/apiserver/service/minio_server.go index 6ec4a59df..a58fb27e1 100644 --- a/apiserver/service/minio_server.go +++ b/apiserver/service/minio_server.go @@ -34,7 +34,6 @@ import ( "github.com/kubeagi/arcadia/apiserver/pkg/client" "github.com/kubeagi/arcadia/apiserver/pkg/common" "github.com/kubeagi/arcadia/apiserver/pkg/oidc" - "github.com/kubeagi/arcadia/pkg/cache" "github.com/kubeagi/arcadia/pkg/datasource" ) @@ -42,7 +41,6 @@ type ( minioAPI struct { conf gqlconfig.ServerConfig client dynamic.Interface - store cache.Cache } Chunk struct { @@ -65,8 +63,8 @@ type ( MD5 string `json:"md5"` // The file is eventually stored in bucketPath/filtName in the bucket. - FileName string `json:"fileName"` Bucket string `json:"bucket"` + FileName string `json:"fileName"` BucketPath string `json:"bucketPath"` } @@ -76,6 +74,7 @@ type ( MD5 string `json:"md5"` UploadID string `json:"uploadID"` Bucket string `json:"bucket"` + FileName string `json:"fileName"` BucketPath string `json:"bucketPath"` } GenChunkURLResult struct { @@ -91,10 +90,10 @@ type ( CompleteBody struct { MD5 string `json:"md5"` - BucketPath string `json:"bucketPath"` + UploadID string `json:"uploadID"` Bucket string `json:"bucket"` FileName string `json:"fileName"` - UploadID string `json:"uploadID"` + BucketPath string `json:"bucketPath"` } ReadCSVResp struct { @@ -180,30 +179,19 @@ func (m *minioAPI) GetSuccessChunks(ctx *gin.Context) { } // If the file does not exist, you can check if there are any relevant upload records locally. - key := [3]string{bucketName, bucketPath, fildMD5} - fc, ok := m.store.Get(key) - if !ok { - // If the file exists in MinIO but there is no corresponding record locally (for example, if the user directly uploaded the file), - // then the user will need to re-upload the file. - klog.Infof("not found file chunkc %s/%s/%s, ", bucketName, bucketPath, fildMD5) - ctx.JSON(http.StatusOK, r) - return - } - - fileChunk := fc.(*common.FileChunk) - - // If uploadid is empty, you need to re-upload. - if fileChunk.UploadID == "" { + uploadID, _ := source.IncompleteUpload(ctx.Request.Context(), datasource.WithBucket(bucketName), + datasource.WithBucketPath(bucketPath), datasource.WithFileName(fileName)) + if uploadID == "" { ctx.JSON(http.StatusOK, r) return } // Checking already uploaded chunks - r.UploadID = fileChunk.UploadID + r.UploadID = uploadID r.Chunks = make([]Chunk, 0) result, err := source.CompletedChunks(ctx.Request.Context(), datasource.WithBucket(bucketName), - datasource.WithBucketPath(bucketPath), datasource.WithFileName(fileChunk.FileName), - datasource.WithUploadID(fileChunk.UploadID)) + datasource.WithBucketPath(bucketPath), datasource.WithFileName(fileName), + datasource.WithUploadID(uploadID)) if err != nil { klog.Errorf("ListObjectParts failed: %s", err) ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{ @@ -284,19 +272,6 @@ func (m *minioAPI) NewMultipart(ctx *gin.Context) { }) return } - if err := m.store.Set([3]string{body.Bucket, body.BucketPath, body.MD5}, &common.FileChunk{ - UploadID: uploadID, - Md5: body.MD5, - FileName: body.FileName, - Bucket: body.Bucket, - BucketPath: body.BucketPath, - }); err != nil { - klog.Errorf("failed to insert new file chunk error %s", err) - ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{ - "message": "failed to store new file chunk", - }) - return - } ctx.JSON(http.StatusOK, gin.H{ "uploadID": uploadID, @@ -343,16 +318,13 @@ func (m *minioAPI) GetMultipartUploadURL(ctx *gin.Context) { }) return } - - fc, ok := m.store.Get([3]string{body.Bucket, body.BucketPath, body.MD5}) - if !ok { - klog.Errorf("failed to get file chunk by md5 %s", body.MD5) - ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{ - "message": "failed to get file chunk by md5", + if body.FileName == "" { + klog.Errorf("fileName is empty") + ctx.AbortWithStatusJSON(http.StatusBadRequest, gin.H{ + "mesage": "fileName is required", }) return } - fileChunk := fc.(*common.FileChunk) source, err := common.SystemDatasourceOSS(ctx.Request.Context(), nil, m.client) if err != nil { @@ -363,7 +335,7 @@ func (m *minioAPI) GetMultipartUploadURL(ctx *gin.Context) { return } result, err := source.CompletedChunks(ctx.Request.Context(), datasource.WithBucket(body.Bucket), - datasource.WithBucketPath(body.BucketPath), datasource.WithFileName(fileChunk.FileName), + datasource.WithBucketPath(body.BucketPath), datasource.WithFileName(body.FileName), datasource.WithUploadID(body.UploadID)) if err != nil { klog.Errorf("ListObjectParts failed: %s", err) @@ -387,7 +359,7 @@ func (m *minioAPI) GetMultipartUploadURL(ctx *gin.Context) { datasource.WithBucketPath(body.BucketPath), datasource.WithUploadID(body.UploadID), datasource.WithPartNumber(fmt.Sprintf("%d", body.PartNumber)), - datasource.WithFileName(fileChunk.FileName)) + datasource.WithFileName(body.FileName)) if err != nil { klog.Errorf("genMultiPartSignedURL failed: %s", err) klog.Errorf("failed to get multipart signed url error %s, md5 %s", err, body.MD5) @@ -433,7 +405,6 @@ func (m *minioAPI) CompleteMultipart(ctx *gin.Context) { return } - _ = m.store.Delete([3]string{body.Bucket, body.BucketPath, body.MD5}) ctx.JSON(http.StatusOK, "success") } @@ -645,7 +616,7 @@ func RegisterMinIOAPI(group *gin.RouterGroup, conf gqlconfig.ServerConfig) { panic(err) } - api := minioAPI{conf: conf, store: cache.NewMemCache(), client: c} + api := minioAPI{conf: conf, client: c} { // model apis diff --git a/pkg/datasource/datasource.go b/pkg/datasource/datasource.go index 8533e3b2e..a00e4101f 100644 --- a/pkg/datasource/datasource.go +++ b/pkg/datasource/datasource.go @@ -115,6 +115,11 @@ type ChunkUploader interface { // To stop the upload, the user needs to destroy the chunked data. Abort(context.Context, ...ChunkUploaderOption) error + + // IncompleteUpload returns the number of times an object does not have a complete upload. Ideally, + // it is guaranteed that there is only one uploadid per object. + // If there is more than one uploadid, return the latest one. + IncompleteUpload(context.Context, ...ChunkUploaderOption) (string, error) } type Datasource interface { diff --git a/pkg/datasource/oss.go b/pkg/datasource/oss.go index b6224f6f1..6648dada6 100644 --- a/pkg/datasource/oss.go +++ b/pkg/datasource/oss.go @@ -320,3 +320,25 @@ func (oss *OSS) preCheck(info any) (*v1alpha1.OSS, error) { } return ossInfo, nil } + +func (oss *OSS) IncompleteUpload(ctx context.Context, options ...ChunkUploaderOption) (string, error) { + s := ChunkUploaderConf{} + for _, opt := range options { + opt(&s) + } + + objectName := fmt.Sprintf("%s/%s", s.relativeDir, s.fileName) + var ( + uploadID string + cur time.Time + ) + first := true + for id := range oss.Client.ListIncompleteUploads(ctx, s.bucket, objectName, true) { + if first || id.Initiated.After(cur) { + uploadID = id.UploadID + cur = id.Initiated + first = false + } + } + return uploadID, nil +}