Skip to content

Commit

Permalink
node/object: Serve SearchV2 RPC
Browse files Browse the repository at this point in the history
WIP

Refs #3058.

Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
  • Loading branch information
cthulhu-rider committed Feb 7, 2025
1 parent 9ea9553 commit 1c14201
Show file tree
Hide file tree
Showing 4 changed files with 297 additions and 7 deletions.
53 changes: 53 additions & 0 deletions pkg/local_object_storage/engine/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ package engine

import (
"errors"
"fmt"
"slices"

objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-sdk-go/client"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
Expand Down Expand Up @@ -92,3 +96,52 @@ func (e *StorageEngine) List(limit uint64) ([]oid.Address, error) {

return addrList, nil
}

// Search performs Search op on all underlying shards and returns merged result.
//
// Returns an error if executions are blocked (see BlockExecution).
func (e *StorageEngine) Search(cnr cid.ID, fs object.SearchFilters, attrs []string, cursor string, count uint16) ([]client.SearchResultItem, string, error) {
if e.metrics != nil {
defer elapsed(e.metrics.AddSearchDuration)() // FIXME: sep metric?
}
e.blockMtx.RLock()
defer e.blockMtx.RUnlock()
if e.blockErr != nil {
return nil, "", e.blockErr
}
shs := e.unsortedShards()
if len(shs) == 0 {
return nil, "", nil
}
res, cursor, err := shs[0].Search(cnr, fs, attrs, cursor, count)
if err != nil {
e.reportShardError(shs[0], "could not select objects from shard", err)
}
if len(shs) == 1 {
return res, cursor, nil
}
shs = shs[1:]
n := uint16(len(res))
for i := range shs {
// TODO: verify query once
// TODO: avoid allocation inside and pass pre-allocated buffer
next, _, err := shs[i].Search(cnr, fs, attrs, cursor, count)
if err != nil {
e.reportShardError(shs[i], "could not select objects from shard", err)
continue
}
if i == 0 {
if r := int(count) - len(res); r > 0 {
res = slices.Grow(res, r)[:len(res)+r]
}
}
n += objectcore.MergeSearchResults(min(count, n), res, next)
}
if n < count {
return res[:n], "", nil
}
if cursor, err = meta.EncodeSearchCursor(fs, res[count-1]); err != nil {
return nil, "", fmt.Errorf("recalculate cursor: %w", err)
}
return res[:count], cursor, nil
}
41 changes: 41 additions & 0 deletions pkg/local_object_storage/metabase/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,3 +740,44 @@ func (x *metaAttributeSeeker) restoreVal(id []byte, attr string, stored []byte)
}
return string(stored), nil
}

// EncodeSearchCursor calculates cursor for the given last search result item.
func EncodeSearchCursor(fs object.SearchFilters, lastItem client.SearchResultItem) (string, error) {
var c []byte
if len(lastItem.Attributes) == 0 || len(fs) == 0 || fs[0].Operation() == object.MatchNotPresent {
c = lastItem.ID[:]
} else {
attr := fs[0].Header()
switch attr {
default:
if n, ok := new(big.Int).SetString(lastItem.Attributes[0], 10); ok {
var off int
c, off = prepareMetaAttrIDKey(new(keyBuffer), lastItem.ID, attr, intValLen, true)
putInt(c[off:off+intValLen], n)
}
case object.FilterOwnerID, object.FilterFirstSplitObject, object.FilterParentID:
var err error
if c, err = base58.Decode(lastItem.Attributes[0]); err != nil {
return "", fmt.Errorf("decode %q attribute value from Base58: %w", attr, err)
}
case object.FilterPayloadChecksum, object.FilterPayloadHomomorphicHash:
var err error
if c, err = hex.DecodeString(lastItem.Attributes[0]); err != nil {
return "", fmt.Errorf("decode %q attribute from HEX: %w", attr, err)
}
case object.FilterSplitID:
uid, err := uuid.Parse(lastItem.Attributes[0])
if err != nil {
return "", fmt.Errorf("decode %q attribute from HEX: %w", attr, err)
}
c = uid[:]
case object.FilterVersion, object.FilterType:
}
if c == nil {
var off int
c, off = prepareMetaAttrIDKey(new(keyBuffer), lastItem.ID, attr, len(lastItem.Attributes[0]), false)
copy(c[off:], lastItem.Attributes[0])
}
}
return base58.Encode(c), nil
}
15 changes: 15 additions & 0 deletions pkg/local_object_storage/shard/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package shard
import (
"fmt"

"github.com/nspcc-dev/neofs-sdk-go/client"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
Expand All @@ -29,3 +30,17 @@ func (s *Shard) Select(cnr cid.ID, filters object.SearchFilters) ([]oid.Address,

return addrs, nil
}

// Search performs Search op on the underlying metabase if it is not disabled.
func (s *Shard) Search(cnr cid.ID, fs object.SearchFilters, attrs []string, cursor string, count uint16) ([]client.SearchResultItem, string, error) {
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return nil, "", ErrDegradedMode
}
res, cursor, err := s.metaBase.Search(cnr, fs, attrs, cursor, count)
if err != nil {
return nil, "", fmt.Errorf("call metabase: %w", err)
}
return res, cursor, nil
}
195 changes: 188 additions & 7 deletions pkg/services/object/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/core/client"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-node/pkg/network"
aclsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/acl/v2"
deletesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/delete"
Expand All @@ -26,10 +27,12 @@ import (
searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search"
objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/util"
sdkclient "github.com/nspcc-dev/neofs-sdk-go/client"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa"
sdknetmap "github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
protoobject "github.com/nspcc-dev/neofs-sdk-go/proto/object"
Expand Down Expand Up @@ -83,14 +86,14 @@ type MetricCollector interface {
type FSChain interface {
netmap.StateDetailed

// ForEachContainerNodePublicKeyInLastTwoEpochs iterates over all nodes matching
// ForEachContainerNodeInLastTwoEpochs iterates over all nodes matching
// the referenced container's storage policy at the current and the previous
// NeoFS epochs, and passes their public keys into f. IterateContainerNodeKeys
// NeoFS epochs, and passes their info into f. IterateContainerNodeKeys
// breaks without an error when f returns false. Keys may be repeated.
//
// Returns [apistatus.ErrContainerNotFound] if referenced container was not
// found.
ForEachContainerNodePublicKeyInLastTwoEpochs(cid.ID, func(pubKey []byte) bool) error
ForEachContainerNodeInLastTwoEpochs(cid.ID, func(sdknetmap.NodeInfo) bool) error

// IsOwnPublicKey checks whether given pubKey assigned to Node in the NeoFS
// network map.
Expand All @@ -117,6 +120,14 @@ type Storage interface {
// and, if so, saves it in the Storage. StoreObject is called only when local
// node complies with the container's storage policy.
VerifyAndStoreObjectLocally(object.Object) error
// SearchObjects selects up to count container's objects from the given
// container matching the specified filters.
SearchObjects(_ cid.ID, _ object.SearchFilters, attrs []string, cursor string, count uint16) ([]sdkclient.SearchResultItem, string, error)
}

// TODO: docs
type RemoteNodes interface {
SendSearchRequest(context.Context, sdknetmap.NodeInfo, *protoobject.SearchV2Request) ([]sdkclient.SearchResultItem, string, error)
}

// ACLInfoExtractor is the interface that allows to fetch data required for ACL
Expand Down Expand Up @@ -164,10 +175,11 @@ type server struct {
metrics MetricCollector
aclChecker aclsvc.ACLChecker
reqInfoProc ACLInfoExtractor
remoteNodes RemoteNodes
}

// New provides protoobject.ObjectServiceServer for the given parameters.
func New(hs Handlers, magicNumber uint32, fsChain FSChain, st Storage, signer ecdsa.PrivateKey, m MetricCollector, ac aclsvc.ACLChecker, rp ACLInfoExtractor) protoobject.ObjectServiceServer {
func New(hs Handlers, magicNumber uint32, fsChain FSChain, st Storage, signer ecdsa.PrivateKey, m MetricCollector, ac aclsvc.ACLChecker, rp ACLInfoExtractor, rn RemoteNodes) protoobject.ObjectServiceServer {
return &server{
handlers: hs,
fsChain: fsChain,
Expand All @@ -177,6 +189,7 @@ func New(hs Handlers, magicNumber uint32, fsChain FSChain, st Storage, signer ec
metrics: m,
aclChecker: ac,
reqInfoProc: rp,
remoteNodes: rn,
}
}

Expand Down Expand Up @@ -1786,7 +1799,8 @@ func (s *server) Replicate(_ context.Context, req *protoobject.ReplicateRequest)
}

var clientInCnr, serverInCnr bool
err = s.fsChain.ForEachContainerNodePublicKeyInLastTwoEpochs(cnr, func(pubKey []byte) bool {
err = s.fsChain.ForEachContainerNodeInLastTwoEpochs(cnr, func(node sdknetmap.NodeInfo) bool {
pubKey := node.PublicKey()
if !serverInCnr {
serverInCnr = s.fsChain.IsOwnPublicKey(pubKey)
}
Expand Down Expand Up @@ -1848,8 +1862,175 @@ func (s *server) Replicate(_ context.Context, req *protoobject.ReplicateRequest)
return resp, nil
}

func (s *server) SearchV2(_ context.Context, _ *protoobject.SearchV2Request) (*protoobject.SearchV2Response, error) {
return nil, errors.New("unimplemented")
func (s *server) signSearchResponse(resp *protoobject.SearchV2Response) *protoobject.SearchV2Response {
resp.VerifyHeader = util.SignResponse(&s.signer, resp)
return resp
}

func (s *server) makeStatusSearchResponse(err error) *protoobject.SearchV2Response {
return s.signSearchResponse(&protoobject.SearchV2Response{
MetaHeader: s.makeResponseMetaHeader(util.ToStatus(err)),
})
}

func (s *server) SearchV2(ctx context.Context, req *protoobject.SearchV2Request) (*protoobject.SearchV2Response, error) {
var (
err error
t = time.Now()
)
defer func() { s.pushOpExecResult(stat.MethodObjectSearchV2, err, t) }()
if err = neofscrypto.VerifyRequestWithBuffer(req, nil); err != nil {
return s.makeStatusSearchResponse(err), nil
}

if s.fsChain.LocalNodeUnderMaintenance() {
return s.makeStatusSearchResponse(apistatus.ErrNodeUnderMaintenance), nil
}

reqInfo, err := s.reqInfoProc.SearchRequestToInfo(req)

Check failure on line 1890 in pkg/services/object/server.go

View workflow job for this annotation

GitHub Actions / Coverage

cannot use req (variable of type *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchV2Request) as *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchRequest value in argument to s.reqInfoProc.SearchRequestToInfo

Check failure on line 1890 in pkg/services/object/server.go

View workflow job for this annotation

GitHub Actions / Build (ubuntu-22.04, linux, amd64)

cannot use req (variable of type *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchV2Request) as *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchRequest value in argument to s.reqInfoProc.SearchRequestToInfo

Check failure on line 1890 in pkg/services/object/server.go

View workflow job for this annotation

GitHub Actions / Unit tests (ubuntu-22.04, 1.22)

cannot use req (variable of type *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchV2Request) as *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchRequest value in argument to s.reqInfoProc.SearchRequestToInfo

Check failure on line 1890 in pkg/services/object/server.go

View workflow job for this annotation

GitHub Actions / Unit tests (macos-14, 1.23)

cannot use req (variable of type *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchV2Request) as *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchRequest value in argument to s.reqInfoProc.SearchRequestToInfo

Check failure on line 1890 in pkg/services/object/server.go

View workflow job for this annotation

GitHub Actions / Build (macos-14, darwin, amd64)

cannot use req (variable of type *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchV2Request) as *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchRequest value in argument to s.reqInfoProc.SearchRequestToInfo

Check failure on line 1890 in pkg/services/object/server.go

View workflow job for this annotation

GitHub Actions / Lint / Lint

cannot use req (variable of type *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchV2Request) as *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchRequest value in argument to s.reqInfoProc.SearchRequestToInfo

Check failure on line 1890 in pkg/services/object/server.go

View workflow job for this annotation

GitHub Actions / Lint / Lint

cannot use req (variable of type *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchV2Request) as *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchRequest value in argument to s.reqInfoProc.SearchRequestToInfo

Check failure on line 1890 in pkg/services/object/server.go

View workflow job for this annotation

GitHub Actions / Lint / Lint

cannot use req (variable of type *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchV2Request) as *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchRequest value in argument to s.reqInfoProc.SearchRequestToInfo
if err != nil {
return s.makeStatusSearchResponse(apistatus.ErrNodeUnderMaintenance), nil
}
if !s.aclChecker.CheckBasicACL(reqInfo) {
err = basicACLErr(reqInfo) // needed for defer
return s.makeStatusSearchResponse(err), nil
}
err = s.aclChecker.CheckEACL(req, reqInfo)
if err != nil {
err = eACLErr(reqInfo, err)
return s.makeStatusSearchResponse(err), nil
}

resp, err := s.processSearchRequest(ctx, req)
if err != nil {
return s.makeStatusSearchResponse(err), nil
}
return s.signSearchResponse(resp), nil

Check failure on line 1908 in pkg/services/object/server.go

View workflow job for this annotation

GitHub Actions / Coverage

cannot use resp (variable of type *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchV2Response_Body) as *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchV2Response value in argument to s.signSearchResponse

Check failure on line 1908 in pkg/services/object/server.go

View workflow job for this annotation

GitHub Actions / Build (ubuntu-22.04, linux, amd64)

cannot use resp (variable of type *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchV2Response_Body) as *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchV2Response value in argument to s.signSearchResponse

Check failure on line 1908 in pkg/services/object/server.go

View workflow job for this annotation

GitHub Actions / Unit tests (ubuntu-22.04, 1.22)

cannot use resp (variable of type *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchV2Response_Body) as *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchV2Response value in argument to s.signSearchResponse

Check failure on line 1908 in pkg/services/object/server.go

View workflow job for this annotation

GitHub Actions / Unit tests (macos-14, 1.23)

cannot use resp (variable of type *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchV2Response_Body) as *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchV2Response value in argument to s.signSearchResponse

Check failure on line 1908 in pkg/services/object/server.go

View workflow job for this annotation

GitHub Actions / Build (macos-14, darwin, amd64)

cannot use resp (variable of type *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchV2Response_Body) as *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchV2Response value in argument to s.signSearchResponse

Check failure on line 1908 in pkg/services/object/server.go

View workflow job for this annotation

GitHub Actions / Lint / Lint

cannot use resp (variable of type *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchV2Response_Body) as *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchV2Response value in argument to s.signSearchResponse

Check failure on line 1908 in pkg/services/object/server.go

View workflow job for this annotation

GitHub Actions / Lint / Lint

cannot use resp (variable of type *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchV2Response_Body) as *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchV2Response value in argument to s.signSearchResponse)

Check failure on line 1908 in pkg/services/object/server.go

View workflow job for this annotation

GitHub Actions / Lint / Lint

cannot use resp (variable of type *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchV2Response_Body) as *"github.com/nspcc-dev/neofs-sdk-go/proto/object".SearchV2Response value in argument to s.signSearchResponse)
}

func (s *server) processSearchRequest(ctx context.Context, req *protoobject.SearchV2Request) (*protoobject.SearchV2Response_Body, error) {
body := req.GetBody()
if body == nil {
return nil, errors.New("missing body")
}
if body.ContainerId == nil {
return nil, errors.New("missing container ID")
}
var cnr cid.ID
if err := cnr.FromProtoMessage(body.ContainerId); err != nil {
return nil, fmt.Errorf("invalid container ID: %w", err)
}
if body.Version != 1 {
return nil, errors.New("unsupported query version")
}
if body.Count == 0 {
return nil, errors.New("zero count")
} else if body.Count > 1000 {
return nil, errors.New("count limit exceeded")
}
if len(body.Attributes) > 0 && (len(body.Filters) == 0 || body.Filters[0].Key != body.Attributes[0]) {
return nil, errors.New("primary attribute must be filtered 1st")
}
for i := range body.Filters {
if body.Filters[i].Key == "" {
return nil, fmt.Errorf("invalid filter #%d: missing key", i)
}
// assuming https://github.com/nspcc-dev/neofs-api/issues/318 and https://github.com/nspcc-dev/neofs-api/issues/319
if body.Filters[i].Key == object.FilterContainerID || body.Filters[i].Key == object.FilterID {
return nil, fmt.Errorf("invalid filter #%d: prohibited attribute %s", i, body.Filters[i].Key)
}
}
for i := range body.Attributes {
if body.Attributes[i] == "" {
return nil, fmt.Errorf("empty attribute #%d", i)
}
if body.Attributes[i] == object.FilterContainerID || body.Attributes[i] == object.FilterID {
return nil, fmt.Errorf("prohibited attribute %s", body.Attributes[i])
}
}
ttl := req.MetaHeader.GetTtl()
if ttl == 0 {
return nil, errors.New("zero TTL")
}
var fs object.SearchFilters
if err := fs.FromProtoMessage(body.Filters); err != nil {
return nil, fmt.Errorf("invalid filters: %w", err)
}

var res []sdkclient.SearchResultItem
var cursor string
var err error
count := uint16(body.Count) // legit according to the limit
if ttl == 1 {
if res, cursor, err = s.storage.SearchObjects(cnr, fs, body.Attributes, body.Cursor, count); err != nil {
return nil, err
}
} else {
var signed bool
var signErr error
var n uint16
var nodeCount int
// FIXME: deduplicate single node RPC
if err = s.fsChain.ForEachContainerNodeInLastTwoEpochs(cnr, func(node sdknetmap.NodeInfo) bool {
nodeCount++
// TODO: consider parallelism
var next []sdkclient.SearchResultItem
if s.fsChain.IsOwnPublicKey(node.PublicKey()) {
if next, cursor, err = s.storage.SearchObjects(cnr, fs, body.Attributes, body.Cursor, count); err != nil {
// TODO: log error
return true
}
} else {
if !signed {
req.MetaHeader = &protosession.RequestMetaHeader{Ttl: 1, Origin: req.MetaHeader}
var err error
if req.VerifyHeader, err = neofscrypto.SignRequestWithBuffer(neofsecdsa.Signer(s.signer), req, nil); err != nil {
signErr = fmt.Errorf("sign request: %w", err)
return false
}
signed = true
}
if next, cursor, err = s.remoteNodes.SendSearchRequest(ctx, node, req); err != nil {
// TODO: log error
return true
}
}
if len(res) == 0 {
if uint16(len(next)) == count {
res = next
return true
}
res = make([]sdkclient.SearchResultItem, count)
}
n += objectcore.MergeSearchResults(min(count, n), res, next)
return true
}); err == nil {
err = signErr
}
if err != nil {
return nil, err
}
if n < count {
cursor = ""
} else if nodeCount > 1 {
if cursor, err = meta.EncodeSearchCursor(fs, res[count-1]); err != nil {
return nil, fmt.Errorf("recalculate cursor: %w", err)
}
}
res = res[:n]
}

resBody := &protoobject.SearchV2Response_Body{
Result: make([]*protoobject.SearchV2Response_OIDWithMeta, len(res)),
Cursor: cursor,
}
for i := range res {
resBody.Result[i] = &protoobject.SearchV2Response_OIDWithMeta{
Id: res[i].ID.ProtoMessage(),
Attributes: res[i].Attributes,
}
}
return resBody, nil
}

func objectFromMessage(gMsg *protoobject.Object) (*object.Object, error) {
Expand Down

0 comments on commit 1c14201

Please sign in to comment.