Skip to content

Commit

Permalink
Support providing chunk size in multi part upload
Browse files Browse the repository at this point in the history
  • Loading branch information
RobiNino committed Apr 11, 2024
1 parent 4e96d77 commit 4b30ba5
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 40 deletions.
7 changes: 5 additions & 2 deletions artifactory/services/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,8 @@ func (us *UploadService) doUpload(artifact UploadData, targetUrlWithProps, logMs
return
}
if shouldTryMultipart {
if err = us.MultipartUpload.UploadFileConcurrently(artifact.Artifact.LocalPath, artifact.Artifact.TargetPath, fileInfo.Size(), details.Checksum.Sha1, us.Progress, uploadParams.SplitCount); err != nil {
if err = us.MultipartUpload.UploadFileConcurrently(artifact.Artifact.LocalPath, artifact.Artifact.TargetPath,
fileInfo.Size(), details.Checksum.Sha1, us.Progress, uploadParams.SplitCount, uploadParams.ChunkSize); err != nil {
return
}
// Once the file is uploaded to the storage, we finalize the multipart upload by performing a checksum deployment to save the file in Artifactory.
Expand Down Expand Up @@ -709,14 +710,16 @@ type UploadParams struct {
MinChecksumDeploy int64
MinSplitSize int64
SplitCount int
ChunkSize int64
ChecksumsCalcEnabled bool
Archive string
// When using the 'archive' option for upload, we can control the target path inside the uploaded archive using placeholders. This operation determines the TargetPathInArchive value.
TargetPathInArchive string
}

func NewUploadParams() UploadParams {
return UploadParams{CommonParams: &utils.CommonParams{}, MinChecksumDeploy: DefaultMinChecksumDeploy, ChecksumsCalcEnabled: true, MinSplitSize: defaultUploadMinSplit, SplitCount: defaultUploadSplitCount}
return UploadParams{CommonParams: &utils.CommonParams{}, MinChecksumDeploy: DefaultMinChecksumDeploy,
ChecksumsCalcEnabled: true, MinSplitSize: defaultUploadMinSplit, SplitCount: defaultUploadSplitCount, ChunkSize: utils.DefaultUploadChunkSize}
}

func DeepCopyUploadParams(params *UploadParams) UploadParams {
Expand Down
46 changes: 24 additions & 22 deletions artifactory/services/utils/multipartupload.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ const (

// Sizes and limits constants
MaxMultipartUploadFileSize = SizeTiB * 5
uploadPartSize int64 = SizeMiB * 20
DefaultUploadChunkSize int64 = SizeMiB * 20

// Retries and polling constants
retriesInterval = time.Second * 5
Expand Down Expand Up @@ -122,13 +122,14 @@ type getConfigResponse struct {
Supported bool `json:"supported,omitempty"`
}

func (mu *MultipartUpload) UploadFileConcurrently(localPath, targetPath string, fileSize int64, sha1 string, progress ioutils.ProgressMgr, splitCount int) (err error) {
func (mu *MultipartUpload) UploadFileConcurrently(localPath, targetPath string, fileSize int64, sha1 string,
progress ioutils.ProgressMgr, splitCount int, chunkSize int64) (err error) {
repoAndPath := strings.SplitN(targetPath, "/", 2)
repoKey := repoAndPath[0]
repoPath := repoAndPath[1]
logMsgPrefix := fmt.Sprintf("[Multipart upload %s] ", repoPath)

token, err := mu.createMultipartUpload(repoKey, repoPath, calculatePartSize(fileSize, 0))
token, err := mu.createMultipartUpload(repoKey, repoPath, calculatePartSize(fileSize, 0, chunkSize))
if err != nil {
return
}
Expand All @@ -154,7 +155,7 @@ func (mu *MultipartUpload) UploadFileConcurrently(localPath, targetPath string,
}
}()

if err = mu.uploadPartsConcurrently(logMsgPrefix, fileSize, splitCount, localPath, progressReader, multipartUploadClient); err != nil {
if err = mu.uploadPartsConcurrently(logMsgPrefix, fileSize, chunkSize, splitCount, localPath, progressReader, multipartUploadClient); err != nil {
return
}

Expand All @@ -175,9 +176,9 @@ func (mu *MultipartUpload) UploadFileConcurrently(localPath, targetPath string,
return mu.completeAndPollForStatus(logMsgPrefix, uint(mu.client.GetHttpClient().GetRetries())+1, sha1, multipartUploadClient, progressReader)
}

func (mu *MultipartUpload) uploadPartsConcurrently(logMsgPrefix string, fileSize int64, splitCount int, localPath string, progressReader ioutils.Progress, multipartUploadClient *httputils.HttpClientDetails) (err error) {
numberOfParts := calculateNumberOfParts(fileSize)
log.Info(fmt.Sprintf("%sSplitting file to %d parts, using %d working threads for uploading...", logMsgPrefix, numberOfParts, splitCount))
func (mu *MultipartUpload) uploadPartsConcurrently(logMsgPrefix string, fileSize, chunkSize int64, splitCount int, localPath string, progressReader ioutils.Progress, multipartUploadClient *httputils.HttpClientDetails) (err error) {
numberOfParts := calculateNumberOfParts(fileSize, chunkSize)
log.Info(fmt.Sprintf("%sSplitting file to %d parts of %s each, using %d working threads for uploading...", logMsgPrefix, numberOfParts, ConvertIntToStorageSizeString(chunkSize), splitCount))
producerConsumer := parallel.NewRunner(splitCount, uint(numberOfParts), false)

wg := new(sync.WaitGroup)
Expand All @@ -186,7 +187,7 @@ func (mu *MultipartUpload) uploadPartsConcurrently(logMsgPrefix string, fileSize
attemptsAllowed.Add(uint64(numberOfParts) * uint64(mu.client.GetHttpClient().GetRetries()))
go func() {
for i := 0; i < int(numberOfParts); i++ {
if err = mu.produceUploadTask(producerConsumer, logMsgPrefix, localPath, fileSize, numberOfParts, int64(i), progressReader, multipartUploadClient, attemptsAllowed, wg); err != nil {
if err = mu.produceUploadTask(producerConsumer, logMsgPrefix, localPath, fileSize, numberOfParts, int64(i), chunkSize, progressReader, multipartUploadClient, attemptsAllowed, wg); err != nil {
return
}
}
Expand All @@ -202,9 +203,9 @@ func (mu *MultipartUpload) uploadPartsConcurrently(logMsgPrefix string, fileSize
return
}

func (mu *MultipartUpload) produceUploadTask(producerConsumer parallel.Runner, logMsgPrefix, localPath string, fileSize, numberOfParts, partId int64, progressReader ioutils.Progress, multipartUploadClient *httputils.HttpClientDetails, attemptsAllowed *atomic.Uint64, wg *sync.WaitGroup) (retErr error) {
func (mu *MultipartUpload) produceUploadTask(producerConsumer parallel.Runner, logMsgPrefix, localPath string, fileSize, numberOfParts, partId, chunkSize int64, progressReader ioutils.Progress, multipartUploadClient *httputils.HttpClientDetails, attemptsAllowed *atomic.Uint64, wg *sync.WaitGroup) (retErr error) {
_, retErr = producerConsumer.AddTaskWithError(func(int) error {
uploadErr := mu.uploadPart(logMsgPrefix, localPath, fileSize, partId, progressReader, multipartUploadClient)
uploadErr := mu.uploadPart(logMsgPrefix, localPath, fileSize, partId, chunkSize, progressReader, multipartUploadClient)
if uploadErr == nil {
log.Info(fmt.Sprintf("%sCompleted uploading part %d/%d", logMsgPrefix, partId+1, numberOfParts))
wg.Done()
Expand All @@ -220,25 +221,25 @@ func (mu *MultipartUpload) produceUploadTask(producerConsumer parallel.Runner, l

// Sleep before trying again
time.Sleep(retriesInterval)
if err := mu.produceUploadTask(producerConsumer, logMsgPrefix, localPath, fileSize, numberOfParts, partId, progressReader, multipartUploadClient, attemptsAllowed, wg); err != nil {
if err := mu.produceUploadTask(producerConsumer, logMsgPrefix, localPath, fileSize, numberOfParts, partId, chunkSize, progressReader, multipartUploadClient, attemptsAllowed, wg); err != nil {
retErr = err
}
})
return
}

func (mu *MultipartUpload) uploadPart(logMsgPrefix, localPath string, fileSize, partId int64, progressReader ioutils.Progress, multipartUploadClient *httputils.HttpClientDetails) (err error) {
func (mu *MultipartUpload) uploadPart(logMsgPrefix, localPath string, fileSize, partId, chunkSize int64, progressReader ioutils.Progress, multipartUploadClient *httputils.HttpClientDetails) (err error) {
file, err := os.Open(localPath)
if err != nil {
return errorutils.CheckError(err)
}
defer func() {
err = errors.Join(err, errorutils.CheckError(file.Close()))
}()
if _, err = file.Seek(partId*uploadPartSize, io.SeekStart); err != nil {
if _, err = file.Seek(partId*chunkSize, io.SeekStart); err != nil {
return errorutils.CheckError(err)
}
partSize := calculatePartSize(fileSize, partId)
partSize := calculatePartSize(fileSize, partId, chunkSize)

limitReader := io.LimitReader(file, partSize)
limitReader = bufio.NewReader(limitReader)
Expand Down Expand Up @@ -402,21 +403,22 @@ func (mu *MultipartUpload) abort(logMsgPrefix string, multipartUploadClient *htt
return errorutils.CheckResponseStatusWithBody(resp, body, http.StatusNoContent)
}

// Calculates the part size based on the file size and the part number.
// Calculates the part size based on the file size, the part number and the requested chunk size.
// fileSize - the file size
// partNumber - the current part number
func calculatePartSize(fileSize int64, partNumber int64) int64 {
partOffset := partNumber * uploadPartSize
if partOffset+uploadPartSize > fileSize {
// requestedChunkSize - chunk size requested by the user, or default.
func calculatePartSize(fileSize, partNumber, requestedChunkSize int64) int64 {
partOffset := partNumber * requestedChunkSize
if partOffset+requestedChunkSize > fileSize {
return fileSize - partOffset
}
return uploadPartSize
return requestedChunkSize
}

// Calculates the number of parts based on the file size and the default part size.
// Calculates the number of parts based on the file size and the requested chunks size.
// fileSize - the file size
func calculateNumberOfParts(fileSize int64) int64 {
return (fileSize + uploadPartSize - 1) / uploadPartSize
func calculateNumberOfParts(fileSize, chunkSize int64) int64 {
return (fileSize + chunkSize - 1) / chunkSize
}

func parseMultipartUploadStatus(status statusResponse) (shouldKeepPolling, shouldRerunComplete bool, err error) {
Expand Down
32 changes: 16 additions & 16 deletions artifactory/services/utils/multipartupload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestUploadPartsConcurrentlyTooManyAttempts(t *testing.T) {
defer cleanUp()

// Write something to the file
buf := make([]byte, uploadPartSize*3)
buf := make([]byte, DefaultUploadChunkSize*3)
_, err := rand.Read(buf)
assert.NoError(t, err)
_, err = tempFile.Write(buf)
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestUploadPartsConcurrentlyTooManyAttempts(t *testing.T) {

// Execute uploadPartsConcurrently
fileSize := int64(len(buf))
err = multipartUpload.uploadPartsConcurrently("", fileSize, splitCount, tempFile.Name(), nil, &httputils.HttpClientDetails{})
err = multipartUpload.uploadPartsConcurrently("", fileSize, DefaultUploadChunkSize, splitCount, tempFile.Name(), nil, &httputils.HttpClientDetails{})
assert.ErrorIs(t, err, errTooManyAttempts)
}

Expand Down Expand Up @@ -285,19 +285,19 @@ var calculatePartSizeProvider = []struct {
partNumber int64
expectedPartSize int64
}{
{uploadPartSize - 1, 0, uploadPartSize - 1},
{uploadPartSize, 0, uploadPartSize},
{uploadPartSize + 1, 0, uploadPartSize},
{DefaultUploadChunkSize - 1, 0, DefaultUploadChunkSize - 1},
{DefaultUploadChunkSize, 0, DefaultUploadChunkSize},
{DefaultUploadChunkSize + 1, 0, DefaultUploadChunkSize},

{uploadPartSize*2 - 1, 1, uploadPartSize - 1},
{uploadPartSize * 2, 1, uploadPartSize},
{uploadPartSize*2 + 1, 1, uploadPartSize},
{DefaultUploadChunkSize*2 - 1, 1, DefaultUploadChunkSize - 1},
{DefaultUploadChunkSize * 2, 1, DefaultUploadChunkSize},
{DefaultUploadChunkSize*2 + 1, 1, DefaultUploadChunkSize},
}

func TestCalculatePartSize(t *testing.T) {
for _, testCase := range calculatePartSizeProvider {
t.Run(fmt.Sprintf("fileSize: %d partNumber: %d", testCase.fileSize, testCase.partNumber), func(t *testing.T) {
assert.Equal(t, testCase.expectedPartSize, calculatePartSize(testCase.fileSize, testCase.partNumber))
assert.Equal(t, testCase.expectedPartSize, calculatePartSize(testCase.fileSize, testCase.partNumber, DefaultUploadChunkSize))
})
}
}
Expand All @@ -308,19 +308,19 @@ var calculateNumberOfPartsProvider = []struct {
}{
{0, 0},
{1, 1},
{uploadPartSize - 1, 1},
{uploadPartSize, 1},
{uploadPartSize + 1, 2},
{DefaultUploadChunkSize - 1, 1},
{DefaultUploadChunkSize, 1},
{DefaultUploadChunkSize + 1, 2},

{uploadPartSize*2 - 1, 2},
{uploadPartSize * 2, 2},
{uploadPartSize*2 + 1, 3},
{DefaultUploadChunkSize*2 - 1, 2},
{DefaultUploadChunkSize * 2, 2},
{DefaultUploadChunkSize*2 + 1, 3},
}

func TestCalculateNumberOfParts(t *testing.T) {
for _, testCase := range calculateNumberOfPartsProvider {
t.Run(fmt.Sprintf("fileSize: %d", testCase.fileSize), func(t *testing.T) {
assert.Equal(t, testCase.expectedNumberOfParts, calculateNumberOfParts(testCase.fileSize))
assert.Equal(t, testCase.expectedNumberOfParts, calculateNumberOfParts(testCase.fileSize, DefaultUploadChunkSize))
})
}
}
Expand Down
22 changes: 22 additions & 0 deletions artifactory/services/utils/storageutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package utils
import (
"encoding/json"
"errors"
"fmt"
)

const (
Expand Down Expand Up @@ -126,3 +127,24 @@ type FileStoreSummary struct {
UsedSpace string `json:"usedSpace,omitempty"`
FreeSpace string `json:"freeSpace,omitempty"`
}

func ConvertIntToStorageSizeString(num int64) string {
if num > SizeTiB {
newNum := float64(num) / float64(SizeTiB)
stringNum := fmt.Sprintf("%.1f", newNum)
return stringNum + "TB"
}
if num > SizeGiB {
newNum := float64(num) / float64(SizeGiB)
stringNum := fmt.Sprintf("%.1f", newNum)
return stringNum + "GB"
}
if num > SizeMiB {
newNum := float64(num) / float64(SizeMiB)
stringNum := fmt.Sprintf("%.1f", newNum)
return stringNum + "MB"
}
newNum := float64(num) / float64(SizeKib)
stringNum := fmt.Sprintf("%.1f", newNum)
return stringNum + "KB"
}
18 changes: 18 additions & 0 deletions artifactory/services/utils/storageutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,21 @@ func buildFakeStorageInfo() StorageInfo {
FileStoreSummary: FileStoreSummary{},
}
}

func TestConvertIntToStorageSizeString(t *testing.T) {
tests := []struct {
num int
output string
}{
{12546, "12.3KB"},
{148576, "145.1KB"},
{2587985, "2.5MB"},
{12896547, "12.3MB"},
{12896547785, "12.0GB"},
{5248965785422365, "4773.9TB"},
}

for _, test := range tests {
assert.Equal(t, test.output, ConvertIntToStorageSizeString(int64(test.num)))
}
}

0 comments on commit 4b30ba5

Please sign in to comment.