Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support new object search #676

Merged
merged 8 commits into from
Jan 10, 2025
15 changes: 13 additions & 2 deletions client/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,17 @@ var (
proto.Clone(validProtoObjectIDs[2]).(*protorefs.ObjectID),
},
}
// correct ObjectService.SearchV2 response payload with required fields only.
validMinSearchV2ResponseBody = &protoobject.SearchV2Response_Body{}
// correct ObjectService.SearchV2 response payload with all fields.
validFullSearchV2ResponseBody = &protoobject.SearchV2Response_Body{
Result: []*protoobject.SearchV2Response_OIDWithMeta{
{Id: proto.Clone(validProtoObjectIDs[0]).(*protorefs.ObjectID), Attributes: []string{"val_1_1", "val_1_2"}},
{Id: proto.Clone(validProtoObjectIDs[1]).(*protorefs.ObjectID), Attributes: []string{"val_2_1", "val_2_2"}},
{Id: proto.Clone(validProtoObjectIDs[2]).(*protorefs.ObjectID), Attributes: []string{"val_3_1", "val_3_2"}},
},
Cursor: "any_cursor",
}
)

// Reputation service.
Expand Down Expand Up @@ -1840,7 +1851,7 @@ func checkSplitInfoTransport(s object.SplitInfo, m *protoobject.SplitInfo) error
return nil
}

func checkObjectSearchFilterTransport(f object.SearchFilter, m *protoobject.SearchRequest_Body_Filter) error {
func checkObjectSearchFilterTransport(f object.SearchFilter, m *protoobject.SearchFilter) error {
// 1. matcher
var expMatcher protoobject.MatchType
switch m := f.Operation(); m {
Expand Down Expand Up @@ -1877,7 +1888,7 @@ func checkObjectSearchFilterTransport(f object.SearchFilter, m *protoobject.Sear
return nil
}

func checkObjectSearchFiltersTransport(fs []object.SearchFilter, ms []*protoobject.SearchRequest_Body_Filter) error {
func checkObjectSearchFiltersTransport(fs []object.SearchFilter, ms []*protoobject.SearchFilter) error {
if v1, v2 := len(fs), len(ms); v1 != v2 {
return fmt.Errorf("number of attributes (client: %d, message: %d)", v1, v2)
}
Expand Down
2 changes: 1 addition & 1 deletion client/object_replicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func BenchmarkPrepareReplicationMessage(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
for range b.N {
_, err = prepareReplicateMessage(id, bytes.NewReader(bObj), signer, true)
require.NoError(b, err)
}
Expand Down
215 changes: 214 additions & 1 deletion client/object_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"errors"
"fmt"
"io"
"slices"
"time"

"github.com/nspcc-dev/neofs-sdk-go/bearer"
Expand All @@ -16,11 +17,223 @@
protoobject "github.com/nspcc-dev/neofs-sdk-go/proto/object"
"github.com/nspcc-dev/neofs-sdk-go/proto/refs"
protosession "github.com/nspcc-dev/neofs-sdk-go/proto/session"
"github.com/nspcc-dev/neofs-sdk-go/session"
"github.com/nspcc-dev/neofs-sdk-go/stat"
"github.com/nspcc-dev/neofs-sdk-go/user"
"github.com/nspcc-dev/neofs-sdk-go/version"
)

const (
defaultSearchObjectsQueryVersion = 1

maxSearchObjectsCount = 1000
maxSearchObjectsFilterCount = 8
maxSearchObjectsAttrCount = 4
)

// SearchResultItem groups data of an object matching particular search query.
type SearchResultItem struct {
ID oid.ID
Attributes []string
}

// SearchObjectsOptions groups optional parameters of [Client.SearchObjects].
type SearchObjectsOptions struct {
prmCommonMeta
sessionToken *session.Object
bearerToken *bearer.Token
noForwarding bool

count uint32
}

// DisableForwarding disables request forwarding by the server and limits
// execution to its local storage. Mostly used for system purposes.
func (x *SearchObjectsOptions) DisableForwarding() { x.noForwarding = true }

// WithSessionToken specifies session token to attach to the request. The token
// must be issued for the request signer and target the requested container and
// operation.
func (x *SearchObjectsOptions) WithSessionToken(st session.Object) { x.sessionToken = &st }

// WithBearerToken specifies bearer token to attach to the request. The token
// must be issued by the container owner for the request signer.
func (x *SearchObjectsOptions) WithBearerToken(bt bearer.Token) { x.bearerToken = &bt }

// SetCount limits the search result to a given number. Must be in [1, 1000]
// range. Defaults to 1000.
func (x *SearchObjectsOptions) SetCount(count uint32) { x.count = count }

// SearchObjects selects objects from given container by applying specified
// filters, collects values of requested attributes and returns the result
// sorted. Elements are compared by attributes' values lexicographically in
// priority from first to last, closing with the default sorting by IDs. System
// attributes can be included using special aliases like
// [object.FilterPayloadSize]. SearchObjects also returns opaque continuation
// cursor: when passed to a repeat call, it specifies where to continue the
// operation from. To start the search anew, pass an empty cursor.
//
// Max number of filters is 8. Max number of attributes is 4. If attributes are
// specified, filters must include the 1st of them.
//
// Note that if requested attribute is missing in the matching object,
// corresponding element in its [SearchResultItem.Attributes] is empty.
func (c *Client) SearchObjects(ctx context.Context, cnr cid.ID, filters object.SearchFilters, attrs []string, cursor string,
signer neofscrypto.Signer, opts SearchObjectsOptions) ([]SearchResultItem, string, error) {
var err error
if c.prm.statisticCallback != nil {
startTime := time.Now()
defer func() {
c.sendStatistic(stat.MethodObjectSearchV2, time.Since(startTime), err)
}()
}

switch {
case signer == nil:
return nil, "", ErrMissingSigner
case cnr.IsZero():
err = cid.ErrZero
return nil, "", err
case opts.count > maxSearchObjectsCount:
err = fmt.Errorf("count is out of [1, %d] range", maxSearchObjectsCount)
return nil, "", err
case len(filters) > maxSearchObjectsFilterCount:
err = fmt.Errorf("more than %d filters", maxSearchObjectsFilterCount)
return nil, "", err
case len(attrs) > 0:
if len(attrs) > maxSearchObjectsAttrCount {
err = fmt.Errorf("more than %d attributes", maxSearchObjectsAttrCount)
return nil, "", err
}
for i := range attrs {
if attrs[i] == "" {
err = fmt.Errorf("empty attribute #%d", i)
return nil, "", err
}
for j := i + 1; j < len(attrs); j++ {
if attrs[i] == attrs[j] {
err = fmt.Errorf("duplicated attribute %q", attrs[i])
return nil, "", err
}
}
}
if !slices.ContainsFunc(filters, func(f object.SearchFilter) bool { return f.Header() == attrs[0] }) {
err = fmt.Errorf("attribute %q is requested but not filtered", attrs[0])
return nil, "", err
}
}

if opts.count == 0 {
opts.count = maxSearchObjectsCount
}

req := &protoobject.SearchV2Request{
Body: &protoobject.SearchV2Request_Body{
ContainerId: cnr.ProtoMessage(),
Version: defaultSearchObjectsQueryVersion,
Filters: filters.ProtoMessage(),
Cursor: cursor,
Count: opts.count,
Attributes: attrs,
},
MetaHeader: &protosession.RequestMetaHeader{
Version: version.Current().ProtoMessage(),
},
}
writeXHeadersToMeta(opts.xHeaders, req.MetaHeader)
if opts.noForwarding {
req.MetaHeader.Ttl = localRequestTTL
} else {
req.MetaHeader.Ttl = defaultRequestTTL
}
if opts.sessionToken != nil {
req.MetaHeader.SessionToken = opts.sessionToken.ProtoMessage()
}
if opts.bearerToken != nil {
req.MetaHeader.BearerToken = opts.bearerToken.ProtoMessage()
}

buf := c.buffers.Get().(*[]byte)
defer func() { c.buffers.Put(buf) }()

req.VerifyHeader, err = neofscrypto.SignRequestWithBuffer[*protoobject.SearchV2Request_Body](signer, req, *buf)
if err != nil {
err = fmt.Errorf("%w: %w", errSignRequest, err)
return nil, "", err
}

resp, err := c.object.SearchV2(ctx, req)
if err != nil {
err = rpcErr(err)
return nil, "", err
}

if c.prm.cbRespInfo != nil {
err = c.prm.cbRespInfo(ResponseMetaInfo{
key: resp.GetVerifyHeader().GetBodySignature().GetKey(),
epoch: resp.GetMetaHeader().GetEpoch(),
})
if err != nil {
err = fmt.Errorf("%w: %w", errResponseCallback, err)
return nil, "", err
}
}

if err = neofscrypto.VerifyResponseWithBuffer[*protoobject.SearchV2Response_Body](resp, *buf); err != nil {
err = fmt.Errorf("%w: %w", errResponseSignatures, err)
return nil, "", err
}

if err = apistatus.ToError(resp.GetMetaHeader().GetStatus()); err != nil {
return nil, "", err
}

if resp.Body == nil {
return nil, "", nil
}

n := uint32(len(resp.Body.Result))
const cursorField = "cursor"
if n == 0 {
if resp.Body.Cursor != "" {
err = newErrInvalidResponseField(cursorField, errors.New("set while result is empty"))
return nil, "", err
}
return nil, "", nil
}
if cursor != "" && resp.Body.Cursor == cursor {
err = newErrInvalidResponseField(cursorField, errors.New("repeats the initial one"))
return nil, "", err
}
const resultField = "result"
if n > opts.count {
err = newErrInvalidResponseField(resultField, fmt.Errorf("more items than requested: %d", n))
return nil, "", err
}

res := make([]SearchResultItem, n)
for i, r := range resp.Body.Result {
switch {
case r == nil:
err = newErrInvalidResponseField(resultField, fmt.Errorf("nil element #%d", i))
return nil, "", err

Check warning on line 219 in client/object_search.go

View check run for this annotation

Codecov / codecov/patch

client/object_search.go#L217-L219

Added lines #L217 - L219 were not covered by tests
case r.Id == nil:
err = newErrInvalidResponseField(resultField, fmt.Errorf("invalid element #%d: missing ID", i))
return nil, "", err
case len(r.Attributes) != len(attrs):
err = newErrInvalidResponseField(resultField, fmt.Errorf("invalid element #%d: wrong attribute count %d", i, len(r.Attributes)))
return nil, "", err
}
if err = res[i].ID.FromProtoMessage(r.Id); err != nil {
err = newErrInvalidResponseField(resultField, fmt.Errorf("invalid element #%d: invalid ID: %w", i, err))
return nil, "", err
}
res[i].Attributes = r.Attributes
}

return res, resp.Body.Cursor, nil
}

// PrmObjectSearch groups optional parameters of ObjectSearch operation.
type PrmObjectSearch struct {
sessionContainer
Expand Down Expand Up @@ -216,7 +429,7 @@
req := &protoobject.SearchRequest{
Body: &protoobject.SearchRequest_Body{
ContainerId: containerID.ProtoMessage(),
Version: 1,
Version: defaultSearchObjectsQueryVersion,
Filters: prm.filters.ProtoMessage(),
},
MetaHeader: &protosession.RequestMetaHeader{
Expand Down
Loading
Loading