Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streamline getobject #1085

Merged
merged 5 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/data/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type (
Owner user.ID
OwnerPublicKey keys.PublicKey
Headers map[string]string
Version string
}

// ObjectListResponseContent holds response data for object listing.
Expand Down
7 changes: 7 additions & 0 deletions api/data/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ type ExtendedObjectInfo struct {
IsLatest bool
}

// ComprehensiveObjectInfo represents metasearch result for object, with tags and lock data.
type ComprehensiveObjectInfo struct {
ID oid.ID
TagSet map[string]string
LockInfo *LockInfo
}

func (e ExtendedObjectInfo) Version() string {
if e.NodeVersion.IsUnversioned {
return UnversionedObjectVersionID
Expand Down
82 changes: 52 additions & 30 deletions api/handler/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package handler

import (
"fmt"
"io"
"net/http"
"net/url"
"strconv"
Expand All @@ -22,6 +23,10 @@ type conditionalArgs struct {
IfNoneMatch string
}

const (
defaultBufferSize = 128 * 1024
)

func fetchRangeHeader(headers http.Header, fullSize uint64) (*layer.RangeParams, error) {
const prefix = "bytes="
rangeHeader := headers.Get("Range")
Expand Down Expand Up @@ -77,8 +82,7 @@ func addSSECHeaders(responseHeader http.Header, requestHeader http.Header) {
responseHeader.Set(api.AmzServerSideEncryptionCustomerKeyMD5, requestHeader.Get(api.AmzServerSideEncryptionCustomerKeyMD5))
}

func writeHeaders(h http.Header, requestHeader http.Header, extendedInfo *data.ExtendedObjectInfo, tagSetLength int, isBucketUnversioned bool) {
info := extendedInfo.ObjectInfo
func writeHeaders(h http.Header, requestHeader http.Header, info *data.ObjectInfo, tagSetLength int, isBucketUnversioned bool) {
if len(info.ContentType) > 0 && h.Get(api.ContentType) == "" {
h.Set(api.ContentType, info.ContentType)
}
Expand All @@ -95,7 +99,7 @@ func writeHeaders(h http.Header, requestHeader http.Header, extendedInfo *data.E
h.Set(api.AmzTaggingCount, strconv.Itoa(tagSetLength))

if !isBucketUnversioned {
h.Set(api.AmzVersionID, extendedInfo.Version())
h.Set(api.AmzVersionID, info.Version)
}

if cacheControl := info.Headers[api.CacheControl]; cacheControl != "" {
Expand Down Expand Up @@ -138,12 +142,24 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
VersionID: reqInfo.URL.Query().Get(api.QueryVersionID),
}

extendedInfo, err := h.obj.GetExtendedObjectInfo(r.Context(), p)
comprehensiveObjectInfo, err := h.obj.ComprehensiveObjectInfo(r.Context(), p)
if err != nil {
h.logAndSendError(w, "could not find object", reqInfo, err)
return
}
info := extendedInfo.ObjectInfo

objectWithPayloadReader, err := h.obj.GetObjectWithPayloadReader(r.Context(), &layer.GetObjectWithPayloadReaderParams{
Owner: bktInfo.Owner,
BktInfo: bktInfo,
Object: comprehensiveObjectInfo.ID,
})

if err != nil {
h.logAndSendError(w, "could not get object meta", reqInfo, err)
return
}

info := objectWithPayloadReader.ObjectInfo

if err = checkPreconditions(info, conditional); err != nil {
h.logAndSendError(w, "precondition failed", reqInfo, err)
Expand Down Expand Up @@ -180,46 +196,52 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
return
}

t := &layer.ObjectVersion{
BktInfo: bktInfo,
ObjectName: info.Name,
}

if bktSettings.VersioningEnabled() {
t.VersionID = info.VersionID()
}

tagSet, lockInfo, err := h.obj.GetObjectTaggingAndLock(r.Context(), t, extendedInfo.NodeVersion)
if err != nil && !s3errors.IsS3Error(err, s3errors.ErrNoSuchKey) {
h.logAndSendError(w, "could not get object meta data", reqInfo, err)
return
}

if layer.IsAuthenticatedRequest(r.Context()) {
overrideResponseHeaders(w.Header(), reqInfo.URL.Query())
}

if err = h.setLockingHeaders(bktInfo, lockInfo, w.Header()); err != nil {
if err = h.setLockingHeaders(bktInfo, comprehensiveObjectInfo.LockInfo, w.Header()); err != nil {
h.logAndSendError(w, "could not get locking info", reqInfo, err)
return
}

writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned())
writeHeaders(w.Header(), r.Header, info, len(comprehensiveObjectInfo.TagSet), bktSettings.Unversioned())
if params != nil {
writeRangeHeaders(w, params, info.Size)
} else {
w.WriteHeader(http.StatusOK)
}

getParams := &layer.GetObjectParams{
ObjectInfo: info,
Writer: w,
Range: params,
BucketInfo: bktInfo,
Encryption: encryptionParams,
if params != nil || encryptionParams.Enabled() {
// unfortunately this reader is useless for us in this case, we have to re-read another one.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be reused for sure (just piping into the decryption layer), but that's not our current primary target.

_ = objectWithPayloadReader.Payload.Close()

getParams := &layer.GetObjectParams{
ObjectInfo: info,
Writer: w,
Range: params,
BucketInfo: bktInfo,
Encryption: encryptionParams,
}
if err = h.obj.GetObject(r.Context(), getParams); err != nil {
h.logAndSendError(w, "could not get object", reqInfo, err)
}

return
}

var bufferSize = min(info.Size, defaultBufferSize)
if bufferSize == 0 {
bufferSize = defaultBufferSize
}
if err = h.obj.GetObject(r.Context(), getParams); err != nil {
h.logAndSendError(w, "could not get object", reqInfo, err)

buf := make([]byte, bufferSize)
if _, err = io.CopyBuffer(w, objectWithPayloadReader.Payload, buf); err != nil {
h.logAndSendError(w, "could write object output", reqInfo, err)
}

if err = objectWithPayloadReader.Payload.Close(); err != nil {
h.logAndSendError(w, "close output", reqInfo, err)
}
}

Expand Down
4 changes: 2 additions & 2 deletions api/handler/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
t.VersionID = ""
}

tagSet, lockInfo, err := h.obj.GetObjectTaggingAndLock(r.Context(), t, extendedInfo.NodeVersion)
tagSet, lockInfo, err := h.obj.GetObjectTaggingAndLock(r.Context(), t)
if err != nil && !s3errors.IsS3Error(err, s3errors.ErrNoSuchKey) {
h.logAndSendError(w, "could not get object meta data", reqInfo, err)
return
Expand Down Expand Up @@ -113,7 +113,7 @@ func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
return
}

writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned())
writeHeaders(w.Header(), r.Header, extendedInfo.ObjectInfo, len(tagSet), bktSettings.Unversioned())
w.WriteHeader(http.StatusOK)
}

Expand Down
2 changes: 1 addition & 1 deletion api/layer/compound.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/nspcc-dev/neofs-s3-gw/api/s3errors"
)

func (n *layer) GetObjectTaggingAndLock(ctx context.Context, objVersion *ObjectVersion, nodeVersion *data.NodeVersion) (map[string]string, *data.LockInfo, error) {
func (n *layer) GetObjectTaggingAndLock(ctx context.Context, objVersion *ObjectVersion) (map[string]string, *data.LockInfo, error) {
var err error
owner := n.Owner(ctx)

Expand Down
Loading
Loading