Skip to content

Commit

Permalink
*: Search object and meta in one request
Browse files Browse the repository at this point in the history
It works good for basic case when we don't need to handle encryption and range.

Closes #1084.

Signed-off-by: Evgenii Baidakov <evgenii@nspcc.io>
  • Loading branch information
smallhive committed Feb 27, 2025
1 parent 27c5cc1 commit 7720148
Show file tree
Hide file tree
Showing 11 changed files with 403 additions and 36 deletions.
1 change: 1 addition & 0 deletions api/data/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type (
Owner user.ID
OwnerPublicKey keys.PublicKey
Headers map[string]string
Version string
}

// ObjectListResponseContent holds response data for object listing.
Expand Down
7 changes: 7 additions & 0 deletions api/data/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ type ExtendedObjectInfo struct {
IsLatest bool
}

// ComprehensiveObjectInfo represents metasearch result for object, with tags and lock data.
type ComprehensiveObjectInfo struct {
ID oid.ID
TagSet map[string]string
LockInfo *LockInfo
}

func (e ExtendedObjectInfo) Version() string {
if e.NodeVersion.IsUnversioned {
return UnversionedObjectVersionID
Expand Down
78 changes: 48 additions & 30 deletions api/handler/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package handler

import (
"fmt"
"io"
"net/http"
"net/url"
"strconv"
Expand Down Expand Up @@ -77,8 +78,7 @@ func addSSECHeaders(responseHeader http.Header, requestHeader http.Header) {
responseHeader.Set(api.AmzServerSideEncryptionCustomerKeyMD5, requestHeader.Get(api.AmzServerSideEncryptionCustomerKeyMD5))
}

func writeHeaders(h http.Header, requestHeader http.Header, extendedInfo *data.ExtendedObjectInfo, tagSetLength int, isBucketUnversioned bool) {
info := extendedInfo.ObjectInfo
func writeHeaders(h http.Header, requestHeader http.Header, info *data.ObjectInfo, tagSetLength int, isBucketUnversioned bool) {
if len(info.ContentType) > 0 && h.Get(api.ContentType) == "" {
h.Set(api.ContentType, info.ContentType)
}
Expand All @@ -95,7 +95,7 @@ func writeHeaders(h http.Header, requestHeader http.Header, extendedInfo *data.E
h.Set(api.AmzTaggingCount, strconv.Itoa(tagSetLength))

if !isBucketUnversioned {
h.Set(api.AmzVersionID, extendedInfo.Version())
h.Set(api.AmzVersionID, info.Version)
}

if cacheControl := info.Headers[api.CacheControl]; cacheControl != "" {
Expand Down Expand Up @@ -138,12 +138,24 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
VersionID: reqInfo.URL.Query().Get(api.QueryVersionID),
}

extendedInfo, err := h.obj.GetExtendedObjectInfo(r.Context(), p)
comprehensiveObjectInfo, err := h.obj.ComprehensiveObjectInfo(r.Context(), p)
if err != nil {
h.logAndSendError(w, "could not find object", reqInfo, err)
return
}
info := extendedInfo.ObjectInfo

objectWithPayloadReader, err := h.obj.GetObjectWithPayloadReader(r.Context(), &layer.GetObjectWithPayloadReaderParams{
Owner: bktInfo.Owner,
BktInfo: bktInfo,
Object: comprehensiveObjectInfo.ID,
})

if err != nil {
h.logAndSendError(w, "could not get object meta", reqInfo, err)
return
}

info := objectWithPayloadReader.ObjectInfo

if err = checkPreconditions(info, conditional); err != nil {
h.logAndSendError(w, "precondition failed", reqInfo, err)
Expand Down Expand Up @@ -180,46 +192,52 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
return
}

t := &layer.ObjectVersion{
BktInfo: bktInfo,
ObjectName: info.Name,
}

if bktSettings.VersioningEnabled() {
t.VersionID = info.VersionID()
}

tagSet, lockInfo, err := h.obj.GetObjectTaggingAndLock(r.Context(), t)
if err != nil && !s3errors.IsS3Error(err, s3errors.ErrNoSuchKey) {
h.logAndSendError(w, "could not get object meta data", reqInfo, err)
return
}

if layer.IsAuthenticatedRequest(r.Context()) {
overrideResponseHeaders(w.Header(), reqInfo.URL.Query())
}

if err = h.setLockingHeaders(bktInfo, lockInfo, w.Header()); err != nil {
if err = h.setLockingHeaders(bktInfo, comprehensiveObjectInfo.LockInfo, w.Header()); err != nil {
h.logAndSendError(w, "could not get locking info", reqInfo, err)
return
}

writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned())
writeHeaders(w.Header(), r.Header, info, len(comprehensiveObjectInfo.TagSet), bktSettings.Unversioned())
if params != nil {
writeRangeHeaders(w, params, info.Size)
} else {
w.WriteHeader(http.StatusOK)
}

getParams := &layer.GetObjectParams{
ObjectInfo: info,
Writer: w,
Range: params,
BucketInfo: bktInfo,
Encryption: encryptionParams,
if params != nil || encryptionParams.Enabled() {
// unfortunately this reader is useless for us in this case, we have to re-read another one.
_ = objectWithPayloadReader.Payload.Close()

getParams := &layer.GetObjectParams{
ObjectInfo: info,
Writer: w,
Range: params,
BucketInfo: bktInfo,
Encryption: encryptionParams,
}
if err = h.obj.GetObject(r.Context(), getParams); err != nil {
h.logAndSendError(w, "could not get object", reqInfo, err)
}

return
}
if err = h.obj.GetObject(r.Context(), getParams); err != nil {
h.logAndSendError(w, "could not get object", reqInfo, err)

var bufferSize = min(info.Size, 1024*1024)
if bufferSize == 0 {
bufferSize = 1024 * 1024
}

buf := make([]byte, bufferSize)
if _, err = io.CopyBuffer(w, objectWithPayloadReader.Payload, buf); err != nil {
h.logAndSendError(w, "could write object output", reqInfo, err)
}

if err = objectWithPayloadReader.Payload.Close(); err != nil {
h.logAndSendError(w, "close output", reqInfo, err)
}
}

Expand Down
2 changes: 1 addition & 1 deletion api/handler/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
return
}

writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned())
writeHeaders(w.Header(), r.Header, extendedInfo.ObjectInfo, len(tagSet), bktSettings.Unversioned())
w.WriteHeader(http.StatusOK)
}

Expand Down
168 changes: 168 additions & 0 deletions api/layer/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,20 @@ type (
Error error
}

// GetObjectWithPayloadReaderParams describes params for Client.GetObjectWithPayloadReader.
GetObjectWithPayloadReaderParams struct {
Owner user.ID
BktInfo *data.BucketInfo
Object oid.ID
}

// ObjectWithPayloadReader is a response for Client.GetObjectWithPayloadReader.
ObjectWithPayloadReader struct {
Head *object.Object
Payload io.ReadCloser
ObjectInfo *data.ObjectInfo
}

// Client provides S3 API client interface.
Client interface {
Initialize(ctx context.Context, c EventListener) error
Expand All @@ -217,8 +231,10 @@ type (
DeleteBucket(ctx context.Context, p *DeleteBucketParams) error

GetObject(ctx context.Context, p *GetObjectParams) error
GetObjectWithPayloadReader(ctx context.Context, p *GetObjectWithPayloadReaderParams) (*ObjectWithPayloadReader, error)
GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ObjectInfo, error)
GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ExtendedObjectInfo, error)
ComprehensiveObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ComprehensiveObjectInfo, error)
GetIDForVersioningContainer(ctx context.Context, p *ShortInfoParams) (oid.ID, error)

GetLockInfo(ctx context.Context, obj *ObjectVersion) (*data.LockInfo, error)
Expand Down Expand Up @@ -425,6 +441,27 @@ func (n *layer) ListBuckets(ctx context.Context) ([]*data.BucketInfo, error) {
return n.containerList(ctx)
}

// GetObjectWithPayloadReader returns object head and payload Reader.
func (n *layer) GetObjectWithPayloadReader(ctx context.Context, p *GetObjectWithPayloadReaderParams) (*ObjectWithPayloadReader, error) {
var prm = GetObject{
Container: p.BktInfo.CID,
Object: p.Object,
}

n.prepareAuthParameters(ctx, &prm.PrmAuth, p.Owner)

op, err := n.neoFS.GetObject(ctx, prm)
if err != nil {
return nil, fmt.Errorf("get object: %w", err)
}

return &ObjectWithPayloadReader{
Head: op.Head,
Payload: op.Payload,
ObjectInfo: objectInfoFromMeta(p.BktInfo, op.Head),
}, nil
}

// GetObject from storage.
func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error {
var params getParams
Expand Down Expand Up @@ -618,6 +655,137 @@ func (n *layer) GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams)
return extObjInfo, nil
}

func (n *layer) ComprehensiveObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ComprehensiveObjectInfo, error) {
var (
id oid.ID
err error
settings *data.BucketSettings
owner = n.Owner(ctx)
versions []allVersionsSearchResult
tags, locks bool

tagSet map[string]string
lockInfo *data.LockInfo
)

if len(p.VersionID) == 0 {
versions, tags, locks, err = n.comprehensiveSearchAllVersionsInNeoFS(ctx, p.BktInfo, owner, p.Object, false)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return nil, s3errors.GetAPIError(s3errors.ErrNoSuchKey)
}

return nil, err
}

if versions[0].IsDeleteMarker {
return nil, s3errors.GetAPIError(s3errors.ErrNoSuchKey)
}

id = versions[0].ID
} else if p.VersionID == data.UnversionedObjectVersionID {
versions, tags, locks, err = n.comprehensiveSearchAllVersionsInNeoFS(ctx, p.BktInfo, owner, p.Object, true)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return nil, s3errors.GetAPIError(s3errors.ErrNoSuchVersion)
}

return nil, err
}

id = versions[0].ID
} else {
settings, err = n.GetBucketSettings(ctx, p.BktInfo)
if err != nil {
return nil, fmt.Errorf("get bucket settings: %w", err)
}

versions, tags, locks, err = n.comprehensiveSearchAllVersionsInNeoFS(ctx, p.BktInfo, owner, p.Object, false)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return nil, s3errors.GetAPIError(s3errors.ErrNoSuchVersion)
}
return nil, err
}

var foundVersion *allVersionsSearchResult

if settings.VersioningEnabled() {
for _, version := range versions {
if version.ID.EncodeToString() == p.VersionID {
foundVersion = &version
break
}
}
} else {
// If versioning is not enabled, user "should see" only last version of uploaded object.
if versions[0].ID.EncodeToString() == p.VersionID {
foundVersion = &versions[0]
}
}

if foundVersion == nil {
return nil, s3errors.GetAPIError(s3errors.ErrNoSuchVersion)
}

id = foundVersion.ID
}

if tags {
tagPrm := &GetObjectTaggingParams{
ObjectVersion: &ObjectVersion{
BktInfo: p.BktInfo,
ObjectName: p.Object,
VersionID: p.VersionID,
},
}

_, tagSet, err = n.GetObjectTagging(ctx, tagPrm)
if err != nil {
if !errors.Is(err, ErrNodeNotFound) {
return nil, fmt.Errorf("get tags: %w", err)
}
}
}

if locks {
lockInfo, err = n.getLockDataFromObjects(ctx, p.BktInfo, p.Object, p.VersionID)
if err != nil {
if !errors.Is(err, ErrNodeNotFound) {
return nil, fmt.Errorf("get locks: %w", err)
}
}
}

// n.GetObject()

// meta, err := n.objectHead(ctx, p.BktInfo, id) // latest version.
// if err != nil {
// return nil, nil, nil, fmt.Errorf("get head failed: %w", err)
// }
//
// objInfo := objectInfoFromMeta(p.BktInfo, meta)
//
// extObjInfo := &data.ExtendedObjectInfo{
// ObjectInfo: objInfo,
// NodeVersion: &data.NodeVersion{},
// }

reqInfo := api.GetReqInfo(ctx)
n.log.Debug("get object",
zap.String("reqId", reqInfo.RequestID),
zap.String("bucket", p.BktInfo.Name),
zap.Stringer("cid", p.BktInfo.CID),
zap.String("object", p.Object),
zap.Stringer("oid", id))

return &data.ComprehensiveObjectInfo{
ID: id,
TagSet: tagSet,
LockInfo: lockInfo,
}, nil
}

// GetIDForVersioningContainer returns actual oid.ID for object in versioned container.
func (n *layer) GetIDForVersioningContainer(ctx context.Context, p *ShortInfoParams) (oid.ID, error) {
var (
Expand Down
15 changes: 15 additions & 0 deletions api/layer/neofs.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ type PrmObjectRead struct {
PayloadRange [2]uint64
}

// GetObject groups parameters of NeoFS.ReadObject operation.
type GetObject struct {
// Authentication parameters.
PrmAuth

// Container to read the object header from.
Container cid.ID

// ID of the object for which to read the header.
Object oid.ID
}

// ObjectPart represents partially read NeoFS object.
type ObjectPart struct {
// Object header with optional in-memory payload part.
Expand Down Expand Up @@ -237,6 +249,9 @@ type NeoFS interface {
// prevented the object header from being read.
ReadObject(context.Context, PrmObjectRead) (*ObjectPart, error)

// GetObject returns object head and payload Reader.
GetObject(ctx context.Context, prm GetObject) (*ObjectPart, error)

// CreateObject creates and saves a parameterized object in the NeoFS container.
// It sets 'Timestamp' attribute to the current time.
// It returns the ID of the saved object.
Expand Down
Loading

0 comments on commit 7720148

Please sign in to comment.