Skip to content

Commit

Permalink
node/metabase: Do not cmp matching elements b/w each other
Browse files Browse the repository at this point in the history
Starting from 8f593f9, any iterator
over meta bucket elements is done in ascending order. Since cursor
requests start traversing _after_ the last element, comparing subsequent
matching elements against it or each other is no longer needed.

One more bonus is that we no longer need to cache matching keys in slice.

Closes #3153. Refs #3058.

Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
  • Loading branch information
cthulhu-rider committed Feb 24, 2025
1 parent 65f8f68 commit 5834452
Showing 1 changed file with 14 additions and 60 deletions.
74 changes: 14 additions & 60 deletions pkg/local_object_storage/metabase/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,6 @@ func (db *DB) searchTx(tx *bbolt.Tx, cnr cid.ID, fs object.SearchFilters, fInt m
idIter := len(attrs) == 0 || notPresentPrimMatcher
primAttr := fs[0].Header() // attribute emptiness already prevented
var primSeekKey, primSeekPrefix []byte
var prevResOID, prevResPrimVal []byte
if idIter {
if cursor != nil {
primSeekKey = cursor.Key
Expand All @@ -329,15 +328,6 @@ func (db *DB) searchTx(tx *bbolt.Tx, cnr cid.ID, fs object.SearchFilters, fInt m
primSeekKey[0] = metaPrefixAttrIDPlain
}
primSeekPrefix = primSeekKey[:cursor.ValIDOff]
valID := cursor.Key[cursor.ValIDOff:]
if intPrimMatcher {
prevResPrimVal, prevResOID = valID[:len(valID)-oid.Size], valID[len(valID)-oid.Size:]
} else {
var err error
if prevResPrimVal, prevResOID, err = splitValOID(valID); err != nil {
return nil, nil, fmt.Errorf("invalid VAL_OID: %w", err)
}
}
} else {
if intPrimMatcher {
// we seek 0x01_ATTR_DELIM_VAL either w/ or w/o VAL. We ignore VAL when we need
Expand Down Expand Up @@ -379,8 +369,7 @@ func (db *DB) searchTx(tx *bbolt.Tx, cnr cid.ID, fs object.SearchFilters, fInt m
}

res := make([]client.SearchResultItem, count)
collectedPrimVals := make([][]byte, count)
collectedPrimKeys := make([][]byte, count) // TODO: can be done w/o slice
var lastMatchedPrimKey []byte
var n uint16
var more bool
var id, dbVal, primDBVal []byte
Expand Down Expand Up @@ -488,26 +477,14 @@ nextPrimKey:
if objectStatus(tx, oid.NewAddress(cnr, oid.ID(id)), curEpoch) > 0 { // GC-ed
continue nextPrimKey
}
// object matches, collect attributes
// object matches
if n == count {
more = true
break
}
// collect attributes
collected := make([]string, len(attrs))
var insertI uint16
if len(attrs) > 0 {
if cursor != nil { // can be < than previous response chunk
if c := bytes.Compare(primDBVal, prevResPrimVal); c < 0 || c == 0 && bytes.Compare(id, prevResOID) <= 0 {
continue nextPrimKey
}
// note that if both values are integers, they are already sorted. Otherwise, the order is undefined.
// We could treat non-int values as < then the int ones, but the code would have grown huge
}
for i := range n {
if c := bytes.Compare(primDBVal, collectedPrimVals[i]); c < 0 || c == 0 && bytes.Compare(id, res[i].ID[:]) < 0 {
break
}
if insertI++; insertI == count {
more = true
continue nextPrimKey
}
}
if intPrimMatcher {
var err error
if collected[0], err = restoreIntAttribute(primDBVal); err != nil {
Expand All @@ -517,19 +494,6 @@ nextPrimKey:
collected[0] = string(primDBVal)
}
} else {
if cursor != nil { // can be < than previous response chunk
if bytes.Compare(id, prevResOID) <= 0 {
continue nextPrimKey
}
}
for i := insertI; i < n; i++ {
if bytes.Compare(id, res[i].ID[:]) >= 0 {
if insertI++; insertI == count {
more = true
continue nextPrimKey
}
}
}
}
for i := 1; i < len(attrs); i++ {
val, err := attrSkr.get(id, attrs[i])
Expand All @@ -540,26 +504,16 @@ nextPrimKey:
return nil, nil, err
}
}
if n == count {
more = true
if len(attrs) > 0 {
break
} // else, for empty attrs, "later" objects may have "less" ID, so we should continue
}
copy(res[insertI+1:], res[insertI:])
res[insertI].ID = oid.ID(id)
res[insertI].Attributes = collected
copy(collectedPrimVals[insertI+1:], collectedPrimVals[insertI:])
collectedPrimVals[insertI] = primDBVal
copy(collectedPrimKeys[insertI+1:], collectedPrimKeys[insertI:])
collectedPrimKeys[insertI] = primKey
if n < count {
n++
}
res[n].ID = oid.ID(id)
res[n].Attributes = collected
lastMatchedPrimKey = primKey
n++
// note that even when n == count meaning there will be no more attachments, we
// still need to determine whether there more matching elements (see condition above)
}
var newCursor []byte
if more {
newCursor = slices.Clone(collectedPrimKeys[n-1][1:])
newCursor = slices.Clone(lastMatchedPrimKey[1:])
}
return res[:n], newCursor, nil
}
Expand Down

0 comments on commit 5834452

Please sign in to comment.