Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support new object search #676

Merged
merged 8 commits into from
Jan 10, 2025
15 changes: 13 additions & 2 deletions client/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,17 @@ var (
proto.Clone(validProtoObjectIDs[2]).(*protorefs.ObjectID),
},
}
// correct ObjectService.SearchV2 response payload with required fields only.
validMinSearchV2ResponseBody = &protoobject.SearchV2Response_Body{}
// correct ObjectService.SearchV2 response payload with all fields.
validFullSearchV2ResponseBody = &protoobject.SearchV2Response_Body{
Result: []*protoobject.SearchV2Response_OIDWithMeta{
{Id: proto.Clone(validProtoObjectIDs[0]).(*protorefs.ObjectID), Attributes: []string{"val_1_1", "val_1_2"}},
{Id: proto.Clone(validProtoObjectIDs[1]).(*protorefs.ObjectID), Attributes: []string{"val_2_1", "val_2_2"}},
{Id: proto.Clone(validProtoObjectIDs[2]).(*protorefs.ObjectID), Attributes: []string{"val_3_1", "val_3_2"}},
},
Cursor: "any_cursor",
}
)

// Reputation service.
Expand Down Expand Up @@ -1840,7 +1851,7 @@ func checkSplitInfoTransport(s object.SplitInfo, m *protoobject.SplitInfo) error
return nil
}

func checkObjectSearchFilterTransport(f object.SearchFilter, m *protoobject.SearchRequest_Body_Filter) error {
func checkObjectSearchFilterTransport(f object.SearchFilter, m *protoobject.SearchFilter) error {
// 1. matcher
var expMatcher protoobject.MatchType
switch m := f.Operation(); m {
Expand Down Expand Up @@ -1877,7 +1888,7 @@ func checkObjectSearchFilterTransport(f object.SearchFilter, m *protoobject.Sear
return nil
}

func checkObjectSearchFiltersTransport(fs []object.SearchFilter, ms []*protoobject.SearchRequest_Body_Filter) error {
func checkObjectSearchFiltersTransport(fs []object.SearchFilter, ms []*protoobject.SearchFilter) error {
if v1, v2 := len(fs), len(ms); v1 != v2 {
return fmt.Errorf("number of attributes (client: %d, message: %d)", v1, v2)
}
Expand Down
2 changes: 1 addition & 1 deletion client/object_replicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func BenchmarkPrepareReplicationMessage(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
for range b.N {
_, err = prepareReplicateMessage(id, bytes.NewReader(bObj), signer, true)
require.NoError(b, err)
}
Expand Down
223 changes: 222 additions & 1 deletion client/object_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"errors"
"fmt"
"io"
"slices"
"time"

"github.com/nspcc-dev/neofs-sdk-go/bearer"
Expand All @@ -16,11 +17,231 @@
protoobject "github.com/nspcc-dev/neofs-sdk-go/proto/object"
"github.com/nspcc-dev/neofs-sdk-go/proto/refs"
protosession "github.com/nspcc-dev/neofs-sdk-go/proto/session"
"github.com/nspcc-dev/neofs-sdk-go/session"
"github.com/nspcc-dev/neofs-sdk-go/stat"
"github.com/nspcc-dev/neofs-sdk-go/user"
"github.com/nspcc-dev/neofs-sdk-go/version"
)

const (
defaultSearchObjectsQueryVersion = 1

maxSearchObjectsCount = 1000
maxSearchObjectsFilterCount = 8
maxSearchObjectsAttrCount = 4
)

// SearchResultItem groups data of an object matching particular search query.
type SearchResultItem struct {
ID oid.ID
Attributes []string
}

// SearchObjectsOptions groups optional parameters of [Client.SearchObjects].
type SearchObjectsOptions struct {
prmCommonMeta
sessionToken *session.Object
bearerToken *bearer.Token
noForwarding bool

filters object.SearchFilters
cursor string
attrs []string
}

// DisableForwarding disables request forwarding by the server and limits
// execution to its local storage. Mostly used for system purposes.
func (x *SearchObjectsOptions) DisableForwarding() { x.noForwarding = true }

// WithSessionToken specifies session token to attach to the request. The token
// must be issued for the request signer and target the requested container and
// operation.
func (x *SearchObjectsOptions) WithSessionToken(st session.Object) { x.sessionToken = &st }

// WithBearerToken specifies bearer token to attach to the request. The token
// must be issued by the container owner for the request signer.
func (x *SearchObjectsOptions) WithBearerToken(bt bearer.Token) { x.bearerToken = &bt }

// SetFilters sets filters by which to select objects. All container objects
// match unset/empty filters. Limited by 8.
//
// If at least one attribute is requested via
// [SearchObjectsOptions.AttachAttributes], query must include the 1st one.
func (x *SearchObjectsOptions) SetFilters(filters object.SearchFilters) { x.filters = filters }

// AttachAttributes allows to specify set of requested object attributes: their
// values will be attached to the response as strings in the canonical search
// format. If attributes are specified, matching objects are sorted by them in
// priority from first to last, closing with the default sorting by IDs. System
// attributes can also be included using special aliases like
// [object.FilterPayloadSize]. Limited by 4.
func (x *SearchObjectsOptions) AttachAttributes(attrs []string) { x.attrs = attrs }

// SetCursor sets cursor to continue search returned from previous
// [Client.SearchObjects] call.
func (x *SearchObjectsOptions) SetCursor(cursor string) { x.cursor = cursor }

// SearchObjects selects at most given count of objects from the specified
// container, sorts them and returns their IDs. Also returns an opaque string
// cursor allowing to continue search via [SearchObjectsOptions.SetCursor] when
// more than count objects are needed.
//
// Count must be in [1, 1000] range.
//
// To filter out some objects, use [SearchObjectsOptions.SetFilters]. To get
// some attributes of matching objects, use
// [SearchObjectsOptions.AttachAttributes].
func (c *Client) SearchObjects(ctx context.Context, cnr cid.ID, count uint32, signer neofscrypto.Signer, opts SearchObjectsOptions) ([]SearchResultItem, string, error) {
var err error
if c.prm.statisticCallback != nil {
startTime := time.Now()
defer func() {
c.sendStatistic(stat.MethodObjectSearchV2, time.Since(startTime), err)
}()
}

switch {
case signer == nil:
return nil, "", ErrMissingSigner
case cnr.IsZero():
err = cid.ErrZero
return nil, "", err
case count == 0 || count > maxSearchObjectsCount:
err = fmt.Errorf("count is out of [1, %d] range", maxSearchObjectsCount)
return nil, "", err
case len(opts.filters) > maxSearchObjectsFilterCount:
err = fmt.Errorf("more than %d filters", maxSearchObjectsFilterCount)
return nil, "", err
case len(opts.attrs) > 0:
if len(opts.attrs) > maxSearchObjectsAttrCount {
err = fmt.Errorf("more than %d attributes", maxSearchObjectsAttrCount)
return nil, "", err
}
for i := range opts.attrs {
if opts.attrs[i] == "" {
err = fmt.Errorf("empty attribute #%d", i)
return nil, "", err
}
for j := i + 1; j < len(opts.attrs); j++ {
if opts.attrs[i] == opts.attrs[j] {
err = fmt.Errorf("duplicated attribute %q", opts.attrs[i])
return nil, "", err
}
}
}
if !slices.ContainsFunc(opts.filters, func(f object.SearchFilter) bool { return f.Header() == opts.attrs[0] }) {
err = fmt.Errorf("attribute %q is requested but not filtered", opts.attrs[0])
return nil, "", err
}
}

req := &protoobject.SearchV2Request{
Body: &protoobject.SearchV2Request_Body{
ContainerId: cnr.ProtoMessage(),
Version: defaultSearchObjectsQueryVersion,
Filters: opts.filters.ProtoMessage(),
Cursor: opts.cursor,
Count: count,
Attributes: opts.attrs,
},
MetaHeader: &protosession.RequestMetaHeader{
Version: version.Current().ProtoMessage(),
},
}
writeXHeadersToMeta(opts.xHeaders, req.MetaHeader)
if opts.noForwarding {
req.MetaHeader.Ttl = localRequestTTL
} else {
req.MetaHeader.Ttl = defaultRequestTTL
}
if opts.sessionToken != nil {
req.MetaHeader.SessionToken = opts.sessionToken.ProtoMessage()
}
if opts.bearerToken != nil {
req.MetaHeader.BearerToken = opts.bearerToken.ProtoMessage()
}

buf := c.buffers.Get().(*[]byte)
defer func() { c.buffers.Put(buf) }()

req.VerifyHeader, err = neofscrypto.SignRequestWithBuffer[*protoobject.SearchV2Request_Body](signer, req, *buf)
if err != nil {
err = fmt.Errorf("%w: %w", errSignRequest, err)
return nil, "", err
}

resp, err := c.object.SearchV2(ctx, req)
if err != nil {
err = rpcErr(err)
return nil, "", err
}

if c.prm.cbRespInfo != nil {
err = c.prm.cbRespInfo(ResponseMetaInfo{
key: resp.GetVerifyHeader().GetBodySignature().GetKey(),
epoch: resp.GetMetaHeader().GetEpoch(),
})
if err != nil {
err = fmt.Errorf("%w: %w", errResponseCallback, err)
return nil, "", err
}
}

if err = neofscrypto.VerifyResponseWithBuffer[*protoobject.SearchV2Response_Body](resp, *buf); err != nil {
err = fmt.Errorf("%w: %w", errResponseSignatures, err)
return nil, "", err
}

if err = apistatus.ToError(resp.GetMetaHeader().GetStatus()); err != nil {
return nil, "", err
}

if resp.Body == nil {
return nil, "", nil
}

n := uint32(len(resp.Body.Result))
const cursorField = "cursor"
if n == 0 {
if resp.Body.Cursor != "" {
err = newErrInvalidResponseField(cursorField, errors.New("set while result is empty"))
return nil, "", err
}
return nil, "", nil
}
if opts.cursor != "" && resp.Body.Cursor == opts.cursor {
err = newErrInvalidResponseField(cursorField, errors.New("repeats the initial one"))
return nil, "", err
}
const resultField = "result"
if n > count {
err = newErrInvalidResponseField(resultField, fmt.Errorf("more items than requested: %d", n))
return nil, "", err
}

res := make([]SearchResultItem, n)
for i, r := range resp.Body.Result {
switch {
case r == nil:
err = newErrInvalidResponseField(resultField, fmt.Errorf("nil element #%d", i))
return nil, "", err

Check warning on line 227 in client/object_search.go

View check run for this annotation

Codecov / codecov/patch

client/object_search.go#L225-L227

Added lines #L225 - L227 were not covered by tests
case r.Id == nil:
err = newErrInvalidResponseField(resultField, fmt.Errorf("invalid element #%d: missing ID", i))
return nil, "", err
case len(r.Attributes) != len(opts.attrs):
err = newErrInvalidResponseField(resultField, fmt.Errorf("invalid element #%d: wrong attribute count %d", i, len(r.Attributes)))
return nil, "", err
}
if err = res[i].ID.FromProtoMessage(r.Id); err != nil {
err = newErrInvalidResponseField(resultField, fmt.Errorf("invalid element #%d: invalid ID: %w", i, err))
return nil, "", err
}
res[i].Attributes = r.Attributes
}

return res, resp.Body.Cursor, nil
}

// PrmObjectSearch groups optional parameters of ObjectSearch operation.
type PrmObjectSearch struct {
sessionContainer
Expand Down Expand Up @@ -216,7 +437,7 @@
req := &protoobject.SearchRequest{
Body: &protoobject.SearchRequest_Body{
ContainerId: containerID.ProtoMessage(),
Version: 1,
Version: defaultSearchObjectsQueryVersion,
Filters: prm.filters.ProtoMessage(),
},
MetaHeader: &protosession.RequestMetaHeader{
Expand Down
Loading
Loading