From a21a781449508ac272d7d74ffa9f3c61cb6446bb Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Mon, 19 Feb 2024 11:30:26 +0400 Subject: [PATCH] *: Remove multipart object re-slicing Closes #843. Before, multipartComplete read all parts of Big object to the memory, combine them and generate final Big object. These step consume time and memory, eventually any system will fail to load all parts in mem or timeout during the process. After, object slicing process works from the first uploaded part. Calculating each part hash and whole object hash during whole process. Storing object hash state to each part metadata in tree service. Signed-off-by: Evgenii Baidakov --- api/data/tree.go | 7 + api/layer/layer.go | 9 + api/layer/multipart_upload.go | 308 ++++++++++++++++++++++++++++------ api/layer/neofs.go | 27 +++ api/layer/neofs_mock.go | 14 ++ api/layer/object.go | 64 +++++++ api/layer/tree_mock.go | 31 ++++ api/layer/tree_service.go | 4 + go.mod | 2 +- internal/neofs/neofs.go | 68 ++++++++ internal/neofs/tree.go | 57 +++++++ 11 files changed, 540 insertions(+), 51 deletions(-) diff --git a/api/data/tree.go b/api/data/tree.go index 5c1e8cb0e..7796a69ba 100644 --- a/api/data/tree.go +++ b/api/data/tree.go @@ -90,6 +90,13 @@ type PartInfo struct { Created time.Time // Server creation time. ServerCreated time.Time + + // MultipartHash contains internal state of the [hash.Hash] to calculate whole object payload hash. + MultipartHash []byte + // HomoHash contains internal state of the [hash.Hash] to calculate whole object homomorphic payload hash. + HomoHash []byte + // Elements contain [oid.ID] object list for the current part. + Elements []oid.ID } // ToHeaderString form short part representation to use in S3-Completed-Parts header. diff --git a/api/layer/layer.go b/api/layer/layer.go index 161490988..1cae32eaf 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -9,6 +9,7 @@ import ( "net/url" "strconv" "strings" + "sync" "time" "github.com/nats-io/nats.go" @@ -50,6 +51,7 @@ type ( ncontroller EventListener cache *Cache treeService TreeService + buffers *sync.Pool } Config struct { @@ -266,6 +268,12 @@ func (f MsgHandlerFunc) HandleMessage(ctx context.Context, msg *nats.Msg) error // NewLayer creates an instance of a layer. It checks credentials // and establishes gRPC connection with the node. func NewLayer(log *zap.Logger, neoFS NeoFS, config *Config) Client { + buffers := sync.Pool{} + buffers.New = func() any { + b := make([]byte, neoFS.MaxObjectSize()) + return &b + } + return &layer{ neoFS: neoFS, log: log, @@ -273,6 +281,7 @@ func NewLayer(log *zap.Logger, neoFS NeoFS, config *Config) Client { resolver: config.Resolver, cache: NewCache(config.Caches), treeService: config.TreeService, + buffers: &buffers, } } diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index 96e05d933..ca53c13bb 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -1,10 +1,14 @@ package layer import ( + "bytes" "context" + "crypto/sha256" + "encoding" "encoding/hex" "errors" "fmt" + "hash" "io" "sort" "strconv" @@ -19,14 +23,13 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/user" + "github.com/nspcc-dev/tzhash/tz" "go.uber.org/zap" "golang.org/x/exp/slices" ) const ( - UploadIDAttributeName = "S3-Upload-Id" - UploadPartNumberAttributeName = "S3-Upload-Part-Number" - UploadCompletedParts = "S3-Completed-Parts" + UploadCompletedParts = "S3-Completed-Parts" metaPrefix = "meta-" aclPrefix = "acl-" @@ -203,34 +206,121 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf return nil, s3errors.GetAPIError(s3errors.ErrInvalidEncryptionParameters) } - bktInfo := p.Info.Bkt - prm := PrmObjectCreate{ - Container: bktInfo.CID, - Creator: bktInfo.Owner, - Attributes: make([][2]string, 2), - Payload: p.Reader, - CreationTime: TimeNow(ctx), - CopiesNumber: multipartInfo.CopiesNumber, - } + var ( + bktInfo = p.Info.Bkt + payloadReader = p.Reader + decSize = p.Size + attributes [][2]string + ) - decSize := p.Size if p.Info.Encryption.Enabled() { r, encSize, err := encryptionReader(p.Reader, uint64(p.Size), p.Info.Encryption.Key()) if err != nil { return nil, fmt.Errorf("failed to create ecnrypted reader: %w", err) } - prm.Attributes = append(prm.Attributes, [2]string{AttributeDecryptedSize, strconv.FormatInt(p.Size, 10)}) - prm.Payload = r + attributes = append(attributes, [2]string{AttributeDecryptedSize, strconv.FormatInt(p.Size, 10)}) + payloadReader = r p.Size = int64(encSize) } - prm.Attributes[0][0], prm.Attributes[0][1] = UploadIDAttributeName, p.Info.UploadID - prm.Attributes[1][0], prm.Attributes[1][1] = UploadPartNumberAttributeName, strconv.Itoa(p.PartNumber) + var ( + splitPreviousID oid.ID + isSetSplitPreviousID bool + multipartHash = sha256.New() + tzHash hash.Hash + ) - id, hash, err := n.objectPutAndHash(ctx, prm, bktInfo) + if n.neoFS.IsHomomorphicHashingEnabled() { + tzHash = tz.New() + } + + lastPart, err := n.treeService.GetLastPart(ctx, bktInfo, multipartInfo.ID) if err != nil { - return nil, err + // if ErrPartListIsEmpty, there is the first part of multipart. + if !errors.Is(err, ErrPartListIsEmpty) { + return nil, fmt.Errorf("getLastPart: %w", err) + } + } else { + // try to restore hash state from the last part. + // the required interface is guaranteed according to the docs, so just cast without checks. + binaryUnmarshaler := multipartHash.(encoding.BinaryUnmarshaler) + if err = binaryUnmarshaler.UnmarshalBinary(lastPart.MultipartHash); err != nil { + return nil, fmt.Errorf("unmarshal previous part hash: %w", err) + } + + if tzHash != nil { + binaryUnmarshaler = tzHash.(encoding.BinaryUnmarshaler) + if err = binaryUnmarshaler.UnmarshalBinary(lastPart.HomoHash); err != nil { + return nil, fmt.Errorf("unmarshal previous part homo hash: %w", err) + } + } + + isSetSplitPreviousID = true + splitPreviousID = lastPart.OID + } + + var ( + id oid.ID + elements []oid.ID + creationTime = TimeNow(ctx) + // User may upload part large maxObjectSize in NeoFS. From users point of view it is a single object. + // We have to calculate the hash from this object separately. + currentPartHash = sha256.New() + ) + + objHashes := []hash.Hash{multipartHash, currentPartHash} + if tzHash != nil { + objHashes = append(objHashes, tzHash) + } + + prm := PrmObjectCreate{ + Container: bktInfo.CID, + Creator: bktInfo.Owner, + Attributes: attributes, + CreationTime: creationTime, + CopiesNumber: multipartInfo.CopiesNumber, + Multipart: &Multipart{ + SplitID: multipartInfo.SplitID, + MultipartHashes: objHashes, + }, + } + + chunk := n.buffers.Get().(*[]byte) + + // slice part manually. Simultaneously considering the part is a single object for user. + for { + if isSetSplitPreviousID { + prm.Multipart.SplitPreviousID = &splitPreviousID + } + + nBts, readErr := io.ReadAtLeast(payloadReader, *chunk, len(*chunk)) + if nBts > 0 { + prm.Payload = bytes.NewReader((*chunk)[:nBts]) + prm.PayloadSize = uint64(nBts) + + id, _, err = n.objectPutAndHash(ctx, prm, bktInfo) + if err != nil { + return nil, err + } + + isSetSplitPreviousID = true + splitPreviousID = id + elements = append(elements, id) + } + + if readErr == nil { + continue + } + + // If an EOF happens after reading fewer than min bytes, ReadAtLeast returns ErrUnexpectedEOF. + // We have the whole payload. + if !errors.Is(readErr, io.EOF) && !errors.Is(readErr, io.ErrUnexpectedEOF) { + return nil, fmt.Errorf("read payload chunk: %w", err) + } + + break } + n.buffers.Put(chunk) reqInfo := api.GetReqInfo(ctx) n.log.Debug("upload part", @@ -245,8 +335,26 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf Number: p.PartNumber, OID: id, Size: decSize, - ETag: hex.EncodeToString(hash), + ETag: hex.EncodeToString(currentPartHash.Sum(nil)), Created: prm.CreationTime, + Elements: elements, + } + + // encoding hash.Hash state to save it in tree service. + // the required interface is guaranteed according to the docs, so just cast without checks. + binaryMarshaler := multipartHash.(encoding.BinaryMarshaler) + partInfo.MultipartHash, err = binaryMarshaler.MarshalBinary() + if err != nil { + return nil, fmt.Errorf("marshalBinary: %w", err) + } + + if tzHash != nil { + binaryMarshaler = tzHash.(encoding.BinaryMarshaler) + partInfo.HomoHash, err = binaryMarshaler.MarshalBinary() + + if err != nil { + return nil, fmt.Errorf("marshalBinary: %w", err) + } } oldPartID, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo) @@ -380,8 +488,8 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar var multipartObjetSize int64 var encMultipartObjectSize uint64 - parts := make([]*data.PartInfo, 0, len(p.Parts)) - + var lastPartID int + var children []oid.ID var completedPartsHeader strings.Builder for i, part := range p.Parts { partInfo := partsInfo[part.PartNumber] @@ -392,7 +500,6 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar if i != len(p.Parts)-1 && partInfo.Size < uploadMinSize { return nil, nil, s3errors.GetAPIError(s3errors.ErrEntityTooSmall) } - parts = append(parts, partInfo) multipartObjetSize += partInfo.Size // even if encryption is enabled size is actual (decrypted) if encInfo.Enabled { @@ -410,6 +517,44 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar if _, err = completedPartsHeader.WriteString(partInfoStr); err != nil { return nil, nil, err } + + if part.PartNumber > lastPartID { + lastPartID = part.PartNumber + } + + children = append(children, partInfo.Elements...) + } + + multipartHash := sha256.New() + var homoHash hash.Hash + var splitPreviousID oid.ID + + if lastPartID > 0 { + lastPart := partsInfo[lastPartID] + + if lastPart != nil { + if len(lastPart.MultipartHash) > 0 { + splitPreviousID = lastPart.OID + + if len(lastPart.MultipartHash) > 0 { + binaryUnmarshaler := multipartHash.(encoding.BinaryUnmarshaler) + if err = binaryUnmarshaler.UnmarshalBinary(lastPart.MultipartHash); err != nil { + return nil, nil, fmt.Errorf("unmarshal last part hash: %w", err) + } + } + } + + if n.neoFS.IsHomomorphicHashingEnabled() && len(lastPart.HomoHash) > 0 { + homoHash = tz.New() + + if len(lastPart.MultipartHash) > 0 { + binaryUnmarshaler := homoHash.(encoding.BinaryUnmarshaler) + if err = binaryUnmarshaler.UnmarshalBinary(lastPart.HomoHash); err != nil { + return nil, nil, fmt.Errorf("unmarshal last part homo hash: %w", err) + } + } + } + } } initMetadata := make(map[string]string, len(multipartInfo.Meta)+1) @@ -437,45 +582,108 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar multipartObjetSize = int64(encMultipartObjectSize) } - r := &multiObjectReader{ - ctx: ctx, - layer: n, - parts: parts, - } - - r.prm.bktInfo = p.Info.Bkt - - extObjInfo, err := n.PutObject(ctx, &PutObjectParams{ + // This is our "big object". It doesn't have any payload. + prmHeaderObject := &PutObjectParams{ BktInfo: p.Info.Bkt, Object: p.Info.Key, - Reader: r, + Reader: bytes.NewBuffer(nil), Header: initMetadata, Size: multipartObjetSize, Encryption: p.Info.Encryption, CopiesNumber: multipartInfo.CopiesNumber, - }) + } + + header, err := n.prepareMultipartHeadObject(ctx, prmHeaderObject, multipartHash, homoHash, uint64(multipartObjetSize)) if err != nil { - n.log.Error("could not put a completed object (multipart upload)", - zap.String("uploadID", p.Info.UploadID), - zap.String("uploadKey", p.Info.Key), - zap.Error(err)) + return nil, nil, err + } - return nil, nil, s3errors.GetAPIError(s3errors.ErrInternalError) + // last part + prm := PrmObjectCreate{ + Container: p.Info.Bkt.CID, + Creator: p.Info.Bkt.Owner, + Filepath: p.Info.Key, + CreationTime: TimeNow(ctx), + CopiesNumber: multipartInfo.CopiesNumber, + Multipart: &Multipart{ + SplitID: multipartInfo.SplitID, + SplitPreviousID: &splitPreviousID, + HeaderObject: header, + }, + Payload: bytes.NewBuffer(nil), } - var addr oid.Address - addr.SetContainer(p.Info.Bkt.CID) - for _, partInfo := range partsInfo { - if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil { - n.log.Warn("could not delete upload part", - zap.Stringer("object id", &partInfo.OID), - zap.Stringer("bucket id", p.Info.Bkt.CID), - zap.Error(err)) - } - addr.SetObject(partInfo.OID) - n.cache.DeleteObject(addr) + lastPartObjID, _, err := n.objectPutAndHash(ctx, prm, p.Info.Bkt) + if err != nil { + return nil, nil, err } + children = append(children, lastPartObjID) + + // linking object + prm = PrmObjectCreate{ + Container: p.Info.Bkt.CID, + Creator: p.Info.Bkt.Owner, + CreationTime: TimeNow(ctx), + CopiesNumber: multipartInfo.CopiesNumber, + Multipart: &Multipart{ + SplitID: multipartInfo.SplitID, + HeaderObject: header, + Children: children, + }, + Payload: bytes.NewBuffer(nil), + } + + _, _, err = n.objectPutAndHash(ctx, prm, p.Info.Bkt) + if err != nil { + return nil, nil, err + } + + bktSettings, err := n.GetBucketSettings(ctx, p.Info.Bkt) + if err != nil { + return nil, nil, fmt.Errorf("couldn't get versioning settings object: %w", err) + } + + headerObjectID, _ := header.ID() + + // the "big object" is not presented in system, but we have to put correct info about it and its version. + + newVersion := &data.NodeVersion{ + BaseNodeVersion: data.BaseNodeVersion{ + FilePath: p.Info.Key, + Size: multipartObjetSize, + OID: headerObjectID, + ETag: hex.EncodeToString(multipartHash.Sum(nil)), + }, + IsUnversioned: !bktSettings.VersioningEnabled(), + } + + if newVersion.ID, err = n.treeService.AddVersion(ctx, p.Info.Bkt, newVersion); err != nil { + return nil, nil, fmt.Errorf("couldn't add multipart new verion to tree service: %w", err) + } + + n.cache.CleanListCacheEntriesContainingObject(p.Info.Key, p.Info.Bkt.CID) + + objInfo := &data.ObjectInfo{ + ID: headerObjectID, + CID: p.Info.Bkt.CID, + Owner: p.Info.Bkt.Owner, + Bucket: p.Info.Bkt.Name, + Name: p.Info.Key, + Size: multipartObjetSize, + Created: prm.CreationTime, + Headers: initMetadata, + ContentType: initMetadata[api.ContentType], + HashSum: newVersion.ETag, + } + + extObjInfo := &data.ExtendedObjectInfo{ + ObjectInfo: objInfo, + NodeVersion: newVersion, + } + + n.cache.PutObjectWithName(p.Info.Bkt.Owner, extObjInfo) + return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo.ID) } diff --git a/api/layer/neofs.go b/api/layer/neofs.go index 402f6860a..dd1fee0c7 100644 --- a/api/layer/neofs.go +++ b/api/layer/neofs.go @@ -3,6 +3,7 @@ package layer import ( "context" "errors" + "hash" "io" "time" @@ -114,6 +115,23 @@ type PrmObjectCreate struct { // Number of object copies that is enough to consider put successful. CopiesNumber uint32 + + Multipart *Multipart +} + +// Multipart contains info for local object slicing inside s3-gate during multipart upload operation. +type Multipart struct { + // MultipartHashes contains hashes for the multipart object payload calculation (optional). + MultipartHashes []hash.Hash + // SplitID contains splitID for multipart object (optional). + SplitID string + // SplitPreviousID contains [oid.ID] of previous object in chain (optional). + SplitPreviousID *oid.ID + // Children contains all objects in multipart chain, for linking object (optional). + Children []oid.ID + // HeaderObject is a virtual representation of complete multipart object (optional). It is used to set Parent in + // linking object. + HeaderObject *object.Object } // PrmObjectDelete groups parameters of NeoFS.DeleteObject operation. @@ -208,6 +226,9 @@ type NeoFS interface { // prevented the container from being created. CreateObject(context.Context, PrmObjectCreate) (oid.ID, error) + // FinalizeObjectWithPayloadChecksums fills and signs header object for complete multipart object. + FinalizeObjectWithPayloadChecksums(context.Context, object.Object, hash.Hash, hash.Hash, uint64) (*object.Object, error) + // DeleteObject marks the object to be removed from the NeoFS container by identifier. // Successful return does not guarantee actual removal. // @@ -223,4 +244,10 @@ type NeoFS interface { // // It returns any error encountered which prevented computing epochs. TimeToEpoch(ctx context.Context, now time.Time, future time.Time) (uint64, uint64, error) + + // MaxObjectSize returns configured payload size limit for object slicing when enabled. + MaxObjectSize() int64 + + // IsHomomorphicHashingEnabled shows if homomorphic hashing is enabled in config. + IsHomomorphicHashingEnabled() bool } diff --git a/api/layer/neofs_mock.go b/api/layer/neofs_mock.go index 868e57177..7c24e1054 100644 --- a/api/layer/neofs_mock.go +++ b/api/layer/neofs_mock.go @@ -8,6 +8,7 @@ import ( "encoding/hex" "errors" "fmt" + "hash" "io" "time" @@ -220,6 +221,10 @@ func (t *TestNeoFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID return objID, nil } +func (t *TestNeoFS) FinalizeObjectWithPayloadChecksums(_ context.Context, header object.Object, _ hash.Hash, _ hash.Hash, _ uint64) (*object.Object, error) { + return &header, nil +} + func (t *TestNeoFS) DeleteObject(ctx context.Context, prm PrmObjectDelete) error { var addr oid.Address addr.SetContainer(prm.Container) @@ -241,6 +246,15 @@ func (t *TestNeoFS) TimeToEpoch(_ context.Context, now, futureTime time.Time) (u return t.currentEpoch, t.currentEpoch + uint64(futureTime.Sub(now).Seconds()), nil } +func (t *TestNeoFS) MaxObjectSize() int64 { + // 64 MB + return 67108864 +} + +func (t *TestNeoFS) IsHomomorphicHashingEnabled() bool { + return false +} + func (t *TestNeoFS) AllObjects(cnrID cid.ID) []oid.ID { result := make([]oid.ID, 0, len(t.objects)) diff --git a/api/layer/object.go b/api/layer/object.go index 173d91884..7ec1ecf23 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "errors" "fmt" + "hash" "io" "mime" "path/filepath" @@ -13,6 +14,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/minio/sio" "github.com/nspcc-dev/neofs-s3-gw/api" @@ -23,6 +25,7 @@ import ( cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/nspcc-dev/neofs-sdk-go/version" "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) @@ -309,6 +312,62 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend return extendedObjInfo, nil } +func (n *layer) prepareMultipartHeadObject(ctx context.Context, p *PutObjectParams, payloadHash hash.Hash, homoHash hash.Hash, payloadLength uint64) (*object.Object, error) { + var ( + err error + owner = n.Owner(ctx) + ) + + if p.Encryption.Enabled() { + p.Header[AttributeDecryptedSize] = strconv.FormatInt(p.Size, 10) + if err = addEncryptionHeaders(p.Header, p.Encryption); err != nil { + return nil, fmt.Errorf("add encryption header: %w", err) + } + + var encSize uint64 + if _, encSize, err = encryptionReader(p.Reader, uint64(p.Size), p.Encryption.Key()); err != nil { + return nil, fmt.Errorf("create encrypter: %w", err) + } + p.Size = int64(encSize) + } + + var headerObject object.Object + headerObject.SetContainerID(p.BktInfo.CID) + headerObject.SetType(object.TypeRegular) + headerObject.SetOwnerID(&owner) + + currentVersion := version.Current() + headerObject.SetVersion(¤tVersion) + + attributes := make([]object.Attribute, 0, len(p.Header)) + for k, v := range p.Header { + if v == "" { + return nil, ErrMetaEmptyParameterValue + } + + attributes = append(attributes, *object.NewAttribute(k, v)) + } + + creationTime := TimeNow(ctx) + if creationTime.IsZero() { + creationTime = time.Now() + } + attributes = append(attributes, *object.NewAttribute(object.AttributeTimestamp, strconv.FormatInt(creationTime.Unix(), 10))) + + if p.Object != "" { + attributes = append(attributes, *object.NewAttribute(object.AttributeFilePath, p.Object)) + } + + headerObject.SetAttributes(attributes...) + + multipartHeader, err := n.neoFS.FinalizeObjectWithPayloadChecksums(ctx, headerObject, payloadHash, homoHash, payloadLength) + if err != nil { + return nil, fmt.Errorf("FinalizeObjectWithPayloadChecksums: %w", err) + } + + return multipartHeader, nil +} + func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.BucketInfo, objectName string) (*data.ExtendedObjectInfo, error) { owner := n.Owner(ctx) if extObjInfo := n.cache.GetLastObject(owner, bkt.Name, objectName); extObjInfo != nil { @@ -416,6 +475,11 @@ func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktIn hash := sha256.New() prm.Payload = wrapReader(prm.Payload, 64*1024, func(buf []byte) { hash.Write(buf) + if prm.Multipart != nil { + for _, h := range prm.Multipart.MultipartHashes { + h.Write(buf) + } + } }) id, err := n.neoFS.CreateObject(ctx, prm) if err != nil { diff --git a/api/layer/tree_mock.go b/api/layer/tree_mock.go index e871c4152..3700a39ef 100644 --- a/api/layer/tree_mock.go +++ b/api/layer/tree_mock.go @@ -8,6 +8,7 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/api/data" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "golang.org/x/exp/slices" ) type TreeServiceMock struct { @@ -362,6 +363,36 @@ LOOP: return result, nil } +func (t *TreeServiceMock) GetLastPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) (*data.PartInfo, error) { + parts, err := t.GetParts(ctx, bktInfo, multipartNodeID) + if err != nil { + return nil, fmt.Errorf("get parts: %w", err) + } + + if len(parts) == 0 { + return nil, ErrPartListIsEmpty + } + + // Sort parts by part number, then by server creation time to make actual last uploaded parts with the same number. + slices.SortFunc(parts, func(a, b *data.PartInfo) int { + if a.Number < b.Number { + return -1 + } + + if a.ServerCreated.Before(b.ServerCreated) { + return -1 + } + + if a.ServerCreated.Equal(b.ServerCreated) { + return 0 + } + + return 1 + }) + + return parts[len(parts)-1], nil +} + func (t *TreeServiceMock) DeleteMultipartUpload(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error { cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()] diff --git a/api/layer/tree_service.go b/api/layer/tree_service.go index 4a30910dc..a39ac8f3e 100644 --- a/api/layer/tree_service.go +++ b/api/layer/tree_service.go @@ -74,6 +74,7 @@ type TreeService interface { // If object id to remove is not found returns ErrNoNodeToRemove error. AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) + GetLastPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) (*data.PartInfo, error) // Compound methods for optimizations @@ -90,4 +91,7 @@ var ( // ErrNoNodeToRemove is returned from Tree service in case of the lack of node with OID to remove. ErrNoNodeToRemove = errors.New("no node to remove") + + // ErrPartListIsEmpty is returned if no parts available for the upload. + ErrPartListIsEmpty = errors.New("part list is empty") ) diff --git a/go.mod b/go.mod index def8e9e86..8910d79c6 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240125143754-70b1ffbd8141 github.com/nspcc-dev/neofs-contract v0.19.1 github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240130135633-cb11d035a4ed + github.com/nspcc-dev/tzhash v1.8.0 github.com/panjf2000/ants/v2 v2.5.0 github.com/prometheus/client_golang v1.13.0 github.com/spf13/pflag v1.0.5 @@ -35,7 +36,6 @@ require ( github.com/klauspost/compress v1.17.0 // indirect github.com/nspcc-dev/go-ordered-json v0.0.0-20240112074137-296698a162ae // indirect github.com/nspcc-dev/hrw/v2 v2.0.0-20231115095647-bf62f4ad0a43 // indirect - github.com/nspcc-dev/tzhash v1.8.0 // indirect github.com/pelletier/go-toml/v2 v2.1.1 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect diff --git a/internal/neofs/neofs.go b/internal/neofs/neofs.go index 44a2c20f4..40993933f 100644 --- a/internal/neofs/neofs.go +++ b/internal/neofs/neofs.go @@ -3,9 +3,11 @@ package neofs import ( "bytes" "context" + "crypto/sha256" "encoding/hex" "errors" "fmt" + "hash" "io" "math" "strconv" @@ -16,6 +18,7 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/api/layer" "github.com/nspcc-dev/neofs-s3-gw/authmate" "github.com/nspcc-dev/neofs-s3-gw/creds/tokens" + "github.com/nspcc-dev/neofs-sdk-go/checksum" "github.com/nspcc-dev/neofs-sdk-go/client" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" "github.com/nspcc-dev/neofs-sdk-go/container" @@ -30,6 +33,7 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/stat" "github.com/nspcc-dev/neofs-sdk-go/user" "github.com/nspcc-dev/neofs-sdk-go/waiter" + "github.com/nspcc-dev/tzhash/tz" ) // Config allows to configure some [NeoFS] parameters. @@ -270,6 +274,32 @@ func (x *NeoFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oi obj.SetAttributes(attrs...) obj.SetPayloadSize(prm.PayloadSize) + if prm.Multipart != nil && prm.Multipart.SplitID != "" { + var split object.SplitID + if err := split.Parse(prm.Multipart.SplitID); err != nil { + return oid.ID{}, fmt.Errorf("parse split ID: %w", err) + } + obj.SetSplitID(&split) + + if prm.Multipart.SplitPreviousID != nil { + obj.SetPreviousID(*prm.Multipart.SplitPreviousID) + } + + if len(prm.Multipart.Children) > 0 { + obj.SetChildren(prm.Multipart.Children...) + } + + if prm.Multipart.HeaderObject != nil { + id, isSet := prm.Multipart.HeaderObject.ID() + if !isSet { + return oid.ID{}, errors.New("HeaderObject id is not set") + } + + obj.SetParentID(id) + obj.SetParent(prm.Multipart.HeaderObject) + } + } + if len(prm.Locks) > 0 { var lock object.Lock lock.WriteMembers(prm.Locks) @@ -345,6 +375,34 @@ func (x *NeoFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oi return writer.GetResult().StoredObjectID(), nil } +// FinalizeObjectWithPayloadChecksums implements neofs.NeoFS interface method. +func (x *NeoFS) FinalizeObjectWithPayloadChecksums(ctx context.Context, header object.Object, metaChecksum hash.Hash, homomorphicChecksum hash.Hash, payloadLength uint64) (*object.Object, error) { + header.SetCreationEpoch(x.epochGetter.CurrentEpoch()) + + var cs checksum.Checksum + + var csBytes [sha256.Size]byte + copy(csBytes[:], metaChecksum.Sum(nil)) + + cs.SetSHA256(csBytes) + header.SetPayloadChecksum(cs) + + if homomorphicChecksum != nil { + var csHomoBytes [tz.Size]byte + copy(csHomoBytes[:], homomorphicChecksum.Sum(nil)) + + cs.SetTillichZemor(csHomoBytes) + header.SetPayloadHomomorphicHash(cs) + } + + header.SetPayloadSize(payloadLength) + if err := header.SetIDWithSignature(x.signer(ctx)); err != nil { + return nil, fmt.Errorf("setIDWithSignature: %w", err) + } + + return &header, nil +} + // wraps io.ReadCloser and transforms Read errors related to access violation // to neofs.ErrAccessDenied. type payloadReader struct { @@ -468,6 +526,16 @@ func (x *NeoFS) DeleteObject(ctx context.Context, prm layer.PrmObjectDelete) err return nil } +// MaxObjectSize returns configured payload size limit for object slicing when enabled. +func (x *NeoFS) MaxObjectSize() int64 { + return x.cfg.MaxObjectSize +} + +// IsHomomorphicHashingEnabled shows if homomorphic hashing is enabled in config. +func (x *NeoFS) IsHomomorphicHashingEnabled() bool { + return x.cfg.IsHomomorphicEnabled +} + func isErrAccessDenied(err error) (string, bool) { unwrappedErr := errors.Unwrap(err) for unwrappedErr != nil { diff --git a/internal/neofs/tree.go b/internal/neofs/tree.go index 80d7ee494..14eac6c6d 100644 --- a/internal/neofs/tree.go +++ b/internal/neofs/tree.go @@ -17,6 +17,7 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/internal/neofs/services/tree" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/user" + "golang.org/x/exp/slices" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) @@ -59,6 +60,9 @@ const ( partNumberKV = "Number" sizeKV = "Size" etagKV = "ETag" + multipartHashKV = "MultipartHashes" + homoHashKV = "HomoHash" + elementsKV = "Elements" // keys for lock. isLockKV = "IsLock" @@ -269,6 +273,21 @@ func newPartInfo(node NodeResponse) (*data.PartInfo, error) { return nil, fmt.Errorf("invalid server created timestamp: %w", err) } partInfo.ServerCreated = time.UnixMilli(utcMilli) + case multipartHashKV: + partInfo.MultipartHash = []byte(value) + case homoHashKV: + partInfo.HomoHash = []byte(value) + case elementsKV: + elements := strings.Split(value, ",") + partInfo.Elements = make([]oid.ID, len(elements)) + for i, e := range elements { + var id oid.ID + if err = id.DecodeString(e); err != nil { + return nil, fmt.Errorf("invalid oid: %w", err) + } + + partInfo.Elements[i] = id + } } } @@ -912,6 +931,11 @@ func (c *TreeClient) AddPart(ctx context.Context, bktInfo *data.BucketInfo, mult return oid.ID{}, err } + elements := make([]string, len(info.Elements)) + for i, e := range info.Elements { + elements[i] = e.String() + } + meta := map[string]string{ partNumberKV: strconv.Itoa(info.Number), oidKV: info.OID.EncodeToString(), @@ -919,6 +943,9 @@ func (c *TreeClient) AddPart(ctx context.Context, bktInfo *data.BucketInfo, mult createdKV: strconv.FormatInt(info.Created.UTC().UnixMilli(), 10), serverCreatedKV: strconv.FormatInt(time.Now().UTC().UnixMilli(), 10), etagKV: info.ETag, + multipartHashKV: string(info.MultipartHash), + homoHashKV: string(info.HomoHash), + elementsKV: strings.Join(elements, ","), } var foundPartID uint64 @@ -968,6 +995,36 @@ func (c *TreeClient) GetParts(ctx context.Context, bktInfo *data.BucketInfo, mul return result, nil } +func (c *TreeClient) GetLastPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) (*data.PartInfo, error) { + parts, err := c.GetParts(ctx, bktInfo, multipartNodeID) + if err != nil { + return nil, fmt.Errorf("get parts: %w", err) + } + + if len(parts) == 0 { + return nil, layer.ErrPartListIsEmpty + } + + // Sort parts by part number, then by server creation time to make actual last uploaded parts with the same number. + slices.SortFunc(parts, func(a, b *data.PartInfo) int { + if a.Number < b.Number { + return -1 + } + + if a.ServerCreated.Before(b.ServerCreated) { + return -1 + } + + if a.ServerCreated.Equal(b.ServerCreated) { + return 0 + } + + return 1 + }) + + return parts[len(parts)-1], nil +} + func (c *TreeClient) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error { return c.removeNode(ctx, bktInfo, systemTree, multipartNodeID) }