Skip to content

Commit

Permalink
fix: remove minio uploadid cache
Browse files Browse the repository at this point in the history
  • Loading branch information
0xff-dev committed Dec 19, 2023
1 parent 874e2b1 commit d12a42e
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 58 deletions.
23 changes: 18 additions & 5 deletions apiserver/examples/upload-download-file/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ type (
MD5 string `json:"md5"`

// The file is eventually stored in bucketPath/filtName in the bucket.
FileName string `json:"fileName"`
Bucket string `json:"bucket"`
FileName string `json:"fileName"`
BucketPath string `json:"bucketPath"`
}

Expand All @@ -84,19 +84,31 @@ type (
MD5 string `json:"md5"`
UploadID string `json:"uploadID"`
Bucket string `json:"bucket"`
FileName string `json:"fileName"`
BucketPath string `json:"bucketPath"`
}
GenChunkURLResult struct {
Completed bool `json:"completed"`
URL string `json:"url"`
}

DelteFileBody struct {
Files []string `json:"files"`
Bucket string `json:"bucket"`
BucketPath string `json:"bucketPath"`
}

CompleteBody struct {
MD5 string `json:"md5"`
BucketPath string `json:"bucketPath"`
UploadID string `json:"uploadID"`
Bucket string `json:"bucket"`
FileName string `json:"fileName"`
UploadID string `json:"uploadID"`
BucketPath string `json:"bucketPath"`
}

ReadCSVResp struct {
Rows [][]string `json:"rows"`
Total int64 `json:"total"`
}
)

Expand Down Expand Up @@ -188,7 +200,7 @@ func newMultipart(

func genURL(
md5, bucket, bucketPath, uploadID string,
partNumer int,
partNumer int, fileName string,
transport http.RoundTripper) (GenChunkURLResult, error) {

klog.Infof("[DEBUG] request upload url by uploadid: %s...", uploadID)
Expand All @@ -199,6 +211,7 @@ func genURL(
BucketPath: bucketPath,
UploadID: uploadID,
Size: bufSize,
FileName: fileName,
}

bodyBytes, _ := json.Marshal(body)
Expand Down Expand Up @@ -295,7 +308,7 @@ func do(
md5, uploadID, bucket, bucketPath, fileName string,
transport http.RoundTripper) error {
defer wg.Done()
urlResult, err := genURL(md5, bucket, bucketPath, uploadID, partNumber, transport)
urlResult, err := genURL(md5, bucket, bucketPath, uploadID, partNumber, fileName, transport)
if err != nil {
klog.Error("[do Error] failed to gen url error %s", err)
return err
Expand Down
7 changes: 0 additions & 7 deletions apiserver/pkg/common/file_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,3 @@ const (
// putObject behaves internally as multipart.
MinPartSize = 1024 * 1024 * 64
)

type FileChunk struct {
Md5 string
UploadID string
FileName string
Bucket, BucketPath string
}
63 changes: 17 additions & 46 deletions apiserver/service/minio_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,13 @@ import (
"github.com/kubeagi/arcadia/apiserver/pkg/client"
"github.com/kubeagi/arcadia/apiserver/pkg/common"
"github.com/kubeagi/arcadia/apiserver/pkg/oidc"
"github.com/kubeagi/arcadia/pkg/cache"
"github.com/kubeagi/arcadia/pkg/datasource"
)

type (
minioAPI struct {
conf gqlconfig.ServerConfig
client dynamic.Interface
store cache.Cache
}

Chunk struct {
Expand All @@ -65,8 +63,8 @@ type (
MD5 string `json:"md5"`

// The file is eventually stored in bucketPath/filtName in the bucket.
FileName string `json:"fileName"`
Bucket string `json:"bucket"`
FileName string `json:"fileName"`
BucketPath string `json:"bucketPath"`
}

Expand All @@ -76,6 +74,7 @@ type (
MD5 string `json:"md5"`
UploadID string `json:"uploadID"`
Bucket string `json:"bucket"`
FileName string `json:"fileName"`
BucketPath string `json:"bucketPath"`
}
GenChunkURLResult struct {
Expand All @@ -91,10 +90,10 @@ type (

CompleteBody struct {
MD5 string `json:"md5"`
BucketPath string `json:"bucketPath"`
UploadID string `json:"uploadID"`
Bucket string `json:"bucket"`
FileName string `json:"fileName"`
UploadID string `json:"uploadID"`
BucketPath string `json:"bucketPath"`
}

ReadCSVResp struct {
Expand Down Expand Up @@ -180,30 +179,19 @@ func (m *minioAPI) GetSuccessChunks(ctx *gin.Context) {
}

// If the file does not exist, you can check if there are any relevant upload records locally.
key := [3]string{bucketName, bucketPath, fildMD5}
fc, ok := m.store.Get(key)
if !ok {
// If the file exists in MinIO but there is no corresponding record locally (for example, if the user directly uploaded the file),
// then the user will need to re-upload the file.
klog.Infof("not found file chunkc %s/%s/%s, ", bucketName, bucketPath, fildMD5)
ctx.JSON(http.StatusOK, r)
return
}

fileChunk := fc.(*common.FileChunk)

// If uploadid is empty, you need to re-upload.
if fileChunk.UploadID == "" {
uploadID, _ := source.IncompleteUpload(ctx.Request.Context(), datasource.WithBucket(bucketName),
datasource.WithBucketPath(bucketPath), datasource.WithFileName(fileName))
if uploadID == "" {
ctx.JSON(http.StatusOK, r)
return
}

// Checking already uploaded chunks
r.UploadID = fileChunk.UploadID
r.UploadID = uploadID
r.Chunks = make([]Chunk, 0)
result, err := source.CompletedChunks(ctx.Request.Context(), datasource.WithBucket(bucketName),
datasource.WithBucketPath(bucketPath), datasource.WithFileName(fileChunk.FileName),
datasource.WithUploadID(fileChunk.UploadID))
datasource.WithBucketPath(bucketPath), datasource.WithFileName(fileName),
datasource.WithUploadID(uploadID))
if err != nil {
klog.Errorf("ListObjectParts failed: %s", err)
ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{
Expand Down Expand Up @@ -284,19 +272,6 @@ func (m *minioAPI) NewMultipart(ctx *gin.Context) {
})
return
}
if err := m.store.Set([3]string{body.Bucket, body.BucketPath, body.MD5}, &common.FileChunk{
UploadID: uploadID,
Md5: body.MD5,
FileName: body.FileName,
Bucket: body.Bucket,
BucketPath: body.BucketPath,
}); err != nil {
klog.Errorf("failed to insert new file chunk error %s", err)
ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{
"message": "failed to store new file chunk",
})
return
}

ctx.JSON(http.StatusOK, gin.H{
"uploadID": uploadID,
Expand Down Expand Up @@ -343,16 +318,13 @@ func (m *minioAPI) GetMultipartUploadURL(ctx *gin.Context) {
})
return
}

fc, ok := m.store.Get([3]string{body.Bucket, body.BucketPath, body.MD5})
if !ok {
klog.Errorf("failed to get file chunk by md5 %s", body.MD5)
ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{
"message": "failed to get file chunk by md5",
if body.FileName == "" {
klog.Errorf("fileName is empty")
ctx.AbortWithStatusJSON(http.StatusBadRequest, gin.H{
"mesage": "fileName is required",
})
return
}
fileChunk := fc.(*common.FileChunk)

source, err := common.SystemDatasourceOSS(ctx.Request.Context(), nil, m.client)
if err != nil {
Expand All @@ -363,7 +335,7 @@ func (m *minioAPI) GetMultipartUploadURL(ctx *gin.Context) {
return
}
result, err := source.CompletedChunks(ctx.Request.Context(), datasource.WithBucket(body.Bucket),
datasource.WithBucketPath(body.BucketPath), datasource.WithFileName(fileChunk.FileName),
datasource.WithBucketPath(body.BucketPath), datasource.WithFileName(body.FileName),
datasource.WithUploadID(body.UploadID))
if err != nil {
klog.Errorf("ListObjectParts failed: %s", err)
Expand All @@ -387,7 +359,7 @@ func (m *minioAPI) GetMultipartUploadURL(ctx *gin.Context) {
datasource.WithBucketPath(body.BucketPath),
datasource.WithUploadID(body.UploadID),
datasource.WithPartNumber(fmt.Sprintf("%d", body.PartNumber)),
datasource.WithFileName(fileChunk.FileName))
datasource.WithFileName(body.FileName))
if err != nil {
klog.Errorf("genMultiPartSignedURL failed: %s", err)
klog.Errorf("failed to get multipart signed url error %s, md5 %s", err, body.MD5)
Expand Down Expand Up @@ -433,7 +405,6 @@ func (m *minioAPI) CompleteMultipart(ctx *gin.Context) {
return
}

_ = m.store.Delete([3]string{body.Bucket, body.BucketPath, body.MD5})
ctx.JSON(http.StatusOK, "success")
}

Expand Down Expand Up @@ -645,7 +616,7 @@ func RegisterMinIOAPI(group *gin.RouterGroup, conf gqlconfig.ServerConfig) {
panic(err)
}

api := minioAPI{conf: conf, store: cache.NewMemCache(), client: c}
api := minioAPI{conf: conf, client: c}

{
// model apis
Expand Down
5 changes: 5 additions & 0 deletions pkg/datasource/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ type ChunkUploader interface {

// To stop the upload, the user needs to destroy the chunked data.
Abort(context.Context, ...ChunkUploaderOption) error

// IncompleteUpload returns the number of times an object does not have a complete upload. Ideally,
// it is guaranteed that there is only one uploadid per object.
// If there is more than one uploadid, return the latest one.
IncompleteUpload(context.Context, ...ChunkUploaderOption) (string, error)
}

type Datasource interface {
Expand Down
22 changes: 22 additions & 0 deletions pkg/datasource/oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,3 +320,25 @@ func (oss *OSS) preCheck(info any) (*v1alpha1.OSS, error) {
}
return ossInfo, nil
}

func (oss *OSS) IncompleteUpload(ctx context.Context, options ...ChunkUploaderOption) (string, error) {
s := ChunkUploaderConf{}
for _, opt := range options {
opt(&s)
}

objectName := fmt.Sprintf("%s/%s", s.relativeDir, s.fileName)
var (
uploadID string
cur time.Time
)
first := true
for id := range oss.Client.ListIncompleteUploads(ctx, s.bucket, objectName, true) {
if first || id.Initiated.After(cur) {
uploadID = id.UploadID
cur = id.Initiated
first = false
}
}
return uploadID, nil
}

0 comments on commit d12a42e

Please sign in to comment.