Skip to content

Commit

Permalink
object/searchv2: Fix ordering when attributes are not requested
Browse files Browse the repository at this point in the history
When attributes are not requested, simple OID traverse should be
performed. Previously the behavior was determined according to the
primary filter only.

Refs #3058.

Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
  • Loading branch information
cthulhu-rider committed Feb 24, 2025
1 parent dec793a commit 613d4d4
Showing 1 changed file with 11 additions and 7 deletions.
18 changes: 11 additions & 7 deletions pkg/local_object_storage/metabase/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,11 @@ func (db *DB) searchTx(tx *bbolt.Tx, cnr cid.ID, fs object.SearchFilters, fInt m
primMatcher, primVal := convertFilterValue(fs[0])
intPrimMatcher := objectcore.IsIntegerSearchOp(primMatcher)
notPresentPrimMatcher := primMatcher == object.MatchNotPresent
idIter := len(attrs) == 0 || notPresentPrimMatcher
primAttr := fs[0].Header() // attribute emptiness already prevented
var primSeekKey, primSeekPrefix []byte
var prevResOID, prevResPrimVal []byte
if notPresentPrimMatcher {
if idIter {
if cursor != nil {
primSeekKey = cursor.Key
} else {
Expand Down Expand Up @@ -389,7 +390,7 @@ func (db *DB) searchTx(tx *bbolt.Tx, cnr cid.ID, fs object.SearchFilters, fInt m
dbValInt := new(big.Int)
nextPrimKey:
for ; bytes.HasPrefix(primKey, primSeekPrefix); primKey, _ = primCursor.Next() {
if notPresentPrimMatcher {
if idIter {
if id = primKey[1:]; len(id) != oid.Size {
return nil, nil, invalidMetaBucketKeyErr(primKey, fmt.Errorf("invalid OID len %d", len(id)))
}
Expand Down Expand Up @@ -435,7 +436,7 @@ nextPrimKey:
}
// apply other filters
for i := range fs {
if !notPresentPrimMatcher && (i == 0 || fs[i].Header() == fs[0].Header()) { // 1st already checked
if !idIter && (i == 0 || fs[i].Header() == fs[0].Header()) { // 1st already checked
continue
}
attr := fs[i].Header() // emptiness already prevented
Expand Down Expand Up @@ -907,7 +908,7 @@ func convertFilterValue(f object.SearchFilter) (object.SearchMatchType, string)

// CalculateCursor calculates cursor for the given last search result item.
func CalculateCursor(fs object.SearchFilters, lastItem client.SearchResultItem) (SearchCursor, error) {
if len(fs) == 0 || fs[0].Operation() == object.MatchNotPresent {
if len(lastItem.Attributes) == 0 || len(fs) == 0 || fs[0].Operation() == object.MatchNotPresent {
return SearchCursor{Key: lastItem.ID[:]}, nil
}
attr := fs[0].Header()
Expand All @@ -923,7 +924,11 @@ func CalculateCursor(fs object.SearchFilters, lastItem client.SearchResultItem)
var val []byte
switch attr {
default:
if n, ok := new(big.Int).SetString(lastItemVal, 10); ok {
if objectcore.IsIntegerSearchOp(fs[0].Operation()) {
n, ok := new(big.Int).SetString(lastItemVal, 10)
if !ok {
return SearchCursor{}, fmt.Errorf("non-int attribute value %q with int matcher", lastItemVal)
}
var res SearchCursor
res.Key = make([]byte, len(attr)+attributeDelimiterLen+intValLen+oid.Size)
off := copy(res.Key, attr)
Expand Down Expand Up @@ -1005,9 +1010,8 @@ func PreprocessIntFilters(fs object.SearchFilters) (map[int]ParsedIntFilter, boo
if !f.auto {
if i == 0 || objectcore.IsIntegerSearchOp(fs[0].Operation()) && fs[i].Header() == fs[0].Header() {
f.b = intBytes(n)
} else {
f.n = n
}
f.n = n
}
// TODO: #1148 there are more auto-cases (like <=X AND >=X, <X AND >X), cover more here
fInt[i] = f
Expand Down

0 comments on commit 613d4d4

Please sign in to comment.