Skip to content

Commit

Permalink
object/searchv2: Fix ordering when attributes are not requested
Browse files Browse the repository at this point in the history
When attributes are not requested, simple OID traverse should be
performed. Previously the behavior was determined according to the
primary filter only.

This also extends unit test coverage. Cursors are tested deeper. Many
tests failed revealing implementation bugs fixed here, mainly ordering.
A lot of code has been deduplicated also.

Refs #3160. Refs #3058.

Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
  • Loading branch information
cthulhu-rider committed Feb 24, 2025
1 parent bf9da92 commit 6b25941
Show file tree
Hide file tree
Showing 2 changed files with 252 additions and 428 deletions.
18 changes: 11 additions & 7 deletions pkg/local_object_storage/metabase/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,11 @@ func (db *DB) searchTx(tx *bbolt.Tx, cnr cid.ID, fs object.SearchFilters, fInt m
primMatcher, primVal := convertFilterValue(fs[0])
intPrimMatcher := objectcore.IsIntegerSearchOp(primMatcher)
notPresentPrimMatcher := primMatcher == object.MatchNotPresent
idIter := len(attrs) == 0 || notPresentPrimMatcher
primAttr := fs[0].Header() // attribute emptiness already prevented
var primSeekKey, primSeekPrefix []byte
var prevResOID, prevResPrimVal []byte
if notPresentPrimMatcher {
if idIter {
if cursor != nil {
primSeekKey = cursor.Key
} else {
Expand Down Expand Up @@ -389,7 +390,7 @@ func (db *DB) searchTx(tx *bbolt.Tx, cnr cid.ID, fs object.SearchFilters, fInt m
dbValInt := new(big.Int)
nextPrimKey:
for ; bytes.HasPrefix(primKey, primSeekPrefix); primKey, _ = primCursor.Next() {
if notPresentPrimMatcher {
if idIter {
if id = primKey[1:]; len(id) != oid.Size {
return nil, nil, invalidMetaBucketKeyErr(primKey, fmt.Errorf("invalid OID len %d", len(id)))
}
Expand Down Expand Up @@ -435,7 +436,7 @@ nextPrimKey:
}
// apply other filters
for i := range fs {
if !notPresentPrimMatcher && (i == 0 || fs[i].Header() == fs[0].Header()) { // 1st already checked
if !idIter && (i == 0 || fs[i].Header() == fs[0].Header()) { // 1st already checked
continue
}
attr := fs[i].Header() // emptiness already prevented
Expand Down Expand Up @@ -907,7 +908,7 @@ func convertFilterValue(f object.SearchFilter) (object.SearchMatchType, string)

// CalculateCursor calculates cursor for the given last search result item.
func CalculateCursor(fs object.SearchFilters, lastItem client.SearchResultItem) (SearchCursor, error) {
if len(fs) == 0 || fs[0].Operation() == object.MatchNotPresent {
if len(lastItem.Attributes) == 0 || len(fs) == 0 || fs[0].Operation() == object.MatchNotPresent {
return SearchCursor{Key: lastItem.ID[:]}, nil
}
attr := fs[0].Header()
Expand All @@ -923,7 +924,11 @@ func CalculateCursor(fs object.SearchFilters, lastItem client.SearchResultItem)
var val []byte
switch attr {
default:
if n, ok := new(big.Int).SetString(lastItemVal, 10); ok {
if objectcore.IsIntegerSearchOp(fs[0].Operation()) {
n, ok := new(big.Int).SetString(lastItemVal, 10)
if !ok {
return SearchCursor{}, fmt.Errorf("non-int attribute value %q with int matcher", lastItemVal)
}
var res SearchCursor
res.Key = make([]byte, len(attr)+attributeDelimiterLen+intValLen+oid.Size)
off := copy(res.Key, attr)
Expand Down Expand Up @@ -1005,9 +1010,8 @@ func PreprocessIntFilters(fs object.SearchFilters) (map[int]ParsedIntFilter, boo
if !f.auto {
if i == 0 || objectcore.IsIntegerSearchOp(fs[0].Operation()) && fs[i].Header() == fs[0].Header() {
f.b = intBytes(n)
} else {
f.n = n
}
f.n = n
}
// TODO: #1148 there are more auto-cases (like <=X AND >=X, <X AND >X), cover more here
fInt[i] = f
Expand Down
Loading

0 comments on commit 6b25941

Please sign in to comment.