Skip to content

Commit

Permalink
node/metabase: Prevent value disorder in plain attribute index
Browse files Browse the repository at this point in the history
Previously, since attributes were stored in PREFIX_KEY_DELIM_VAL_OID
format, the lexicographic order of values could be violated. For
example, if two objects had the same attribute with values '1' and '1a',
and the 1st one had OID starting from byte > 'a', they were stored in
the meta bucket in reverse order. Keeping natural order is essential for
the DB iterator's effectiveness for sorted SearchV2.

This extends storage scheme with zero byte delimiter
b/w VAL and OID. It can be a delimiter since none VAL can include it. At
the same time, it preserves the order and resolves mentioned problem.

The original separator was 0xFF as an invalid UTF-8 the 0xFF byte was
banned. In order not to have two different separators, KEY and VAL are
now also separated by 0x00. Any attempt to write metadata with a zero
byte in the attributes will now fail. Although with
53b3c86 such an object cannot come from
the upper-level object service, relying only on it is risky: migration
already works without it, and the metadata limits are critical to the
SearchV2 provision.

Refs #3058.

Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
  • Loading branch information
cthulhu-rider committed Feb 21, 2025
1 parent 27f63e9 commit 5d8fa17
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 64 deletions.
6 changes: 3 additions & 3 deletions pkg/local_object_storage/metabase/VERSION.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,10 @@ The lowest not used bucket index: 20.
- Name: `255` + container ID
- Keys without values
- `0` + object ID
- `1` + attribute + `0xFF` + `0|1` + fixed256(value) + object ID: integer attributes. \
- `1` + attribute + `0x00` + `0|1` + fixed256(value) + object ID: integer attributes. \
Sign byte is 0 for negatives, 1 otherwise. Bits are inverted for negatives also.
- `2` + attribute + `0xFF` + value + object ID
- `3` + object ID + attribute + `0xFF` + value
- `2` + attribute + `0x00` + value + `0x00` + object ID
- `3` + object ID + attribute + `0x00` + value

# History

Expand Down
119 changes: 87 additions & 32 deletions pkg/local_object_storage/metabase/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"math/big"
"slices"
"strconv"
"strings"

"github.com/google/uuid"
"github.com/mr-tron/base58"
Expand All @@ -32,8 +33,8 @@ const (
)

const (
intValLen = 33 // prefix byte for sign + fixed256 in metaPrefixAttrIDInt
attrIDFixedLen = 1 + oid.Size + utf8DelimiterLen // prefix first
intValLen = 33 // prefix byte for sign + fixed256 in metaPrefixAttrIDInt
attrIDFixedLen = 1 + oid.Size + attributeDelimiterLen // prefix first
)

const binPropMarker = "1" // ROOT, PHY, etc.
Expand Down Expand Up @@ -72,6 +73,15 @@ func putMetadataForObject(tx *bbolt.Tx, hdr object.Object, hasParent, phy bool)
func putMetadata(tx *bbolt.Tx, cnr cid.ID, id oid.ID, ver version.Version, owner user.ID, typ object.Type, creationEpoch uint64,
payloadLen uint64, pldHash, pldHmmHash, splitID []byte, parentID, firstID oid.ID, attrs []object.Attribute,
hasParent, phy bool) error {
for i := range attrs {
if strings.IndexByte(attrs[i].Key(), attributeDelimiter[0]) >= 0 {
return fmt.Errorf("attribute #%d key contains 0x%02X byte used in sep", i, attributeDelimiter[0])
}
if strings.IndexByte(attrs[i].Value(), attributeDelimiter[0]) >= 0 {
return fmt.Errorf("attribute #%d value contains 0x%02X byte used in sep", i, attributeDelimiter[0])
}
}

metaBkt, err := tx.CreateBucketIfNotExists(metaBucketKey(cnr))
if err != nil {
return fmt.Errorf("create meta bucket for container: %w", err)
Expand Down Expand Up @@ -158,20 +168,21 @@ func deleteMetadata(tx *bbolt.Tx, cnr cid.ID, id oid.ID) error {
pref[0] = metaPrefixIDAttr
c := metaBkt.Cursor()
for kIDAttr, _ := c.Seek(pref); bytes.HasPrefix(kIDAttr, pref); kIDAttr, _ = c.Next() {
sepInd := bytes.LastIndex(kIDAttr, utf8Delimiter)
sepInd := bytes.LastIndex(kIDAttr, attributeDelimiter)
if sepInd < 0 {
return fmt.Errorf("invalid key with prefix 0x%X in meta bucket: missing delimiter", kIDAttr[0])
}
kAttrID := slices.Clone(kIDAttr)
kAttrID := make([]byte, len(kIDAttr)+attributeDelimiterLen)
kAttrID[0] = metaPrefixAttrIDPlain
copy(kAttrID[1:], kIDAttr[1+oid.Size:])
copy(kAttrID[len(kAttrID)-oid.Size:], id[:])
off := 1 + copy(kAttrID[1:], kIDAttr[1+oid.Size:])
off += copy(kAttrID[off:], attributeDelimiter)
copy(kAttrID[off:], id[:])
ks = append(ks, kIDAttr, kAttrID)
if n, ok := new(big.Int).SetString(string(kIDAttr[sepInd+utf8DelimiterLen:]), 10); ok && intWithinLimits(n) {
kAttrIDInt := make([]byte, sepInd+utf8DelimiterLen+intValLen)
if n, ok := new(big.Int).SetString(string(kIDAttr[sepInd+attributeDelimiterLen:]), 10); ok && intWithinLimits(n) {
kAttrIDInt := make([]byte, sepInd+attributeDelimiterLen+intValLen)
kAttrIDInt[0] = metaPrefixAttrIDInt
off := 1 + copy(kAttrIDInt[1:], kIDAttr[1+oid.Size:sepInd])
off += copy(kAttrIDInt[off:], utf8Delimiter)
off += copy(kAttrIDInt[off:], attributeDelimiter)
putInt(kAttrIDInt[off:off+intValLen], n)
copy(kAttrIDInt[off+intValLen:], id[:])
ks = append(ks, kAttrIDInt)
Expand All @@ -191,9 +202,25 @@ type SearchCursor struct {
ValIDOff int
}

// splits VAL_DELIM_OID.
func splitValOID(b []byte) ([]byte, []byte, error) {
if len(b) < oid.Size {
return nil, nil, fmt.Errorf("too short len %d", len(b))
}
idOff := len(b) - oid.Size
val, ok := bytes.CutSuffix(b[:idOff], attributeDelimiter)
if !ok {
return nil, nil, errors.New("missing delimiter")
}

Check warning on line 214 in pkg/local_object_storage/metabase/metadata.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/metadata.go#L213-L214

Added lines #L213 - L214 were not covered by tests
if len(val) == 0 {
return nil, nil, errors.New("missing value")
}

Check warning on line 217 in pkg/local_object_storage/metabase/metadata.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/metadata.go#L216-L217

Added lines #L216 - L217 were not covered by tests
return val, b[idOff:], nil
}

// NewSearchCursorFromString decodes cursor from the string according to the
// primary attribute.
func NewSearchCursorFromString(s, primAttr string) (*SearchCursor, error) {
func NewSearchCursorFromString(s, primAttr string, isInt bool) (*SearchCursor, error) {
if s == "" {
return nil, nil
}
Expand All @@ -212,17 +239,23 @@ func NewSearchCursorFromString(s, primAttr string) (*SearchCursor, error) {
if n > object.MaxHeaderLen {
return nil, fmt.Errorf("cursor len %d exceeds the limit %d", n, object.MaxHeaderLen)
}
ind := bytes.Index(b[1:], utf8Delimiter) // 1st is prefix
ind := bytes.Index(b[1:], attributeDelimiter) // 1st is prefix
if ind < 0 {
return nil, errors.New("missing delimiter")
}
if !bytes.Equal(b[1:1+ind], []byte(primAttr)) {
return nil, errors.New("wrong attribute")
}
var res SearchCursor
res.ValIDOff = 1 + len(primAttr) + len(utf8Delimiter)
if len(b[res.ValIDOff:]) <= oid.Size {
return nil, errors.New("missing value")
res.ValIDOff = 1 + len(primAttr) + len(attributeDelimiter)
if isInt {
if len(b[res.ValIDOff:]) <= oid.Size {
return nil, errors.New("missing value")
}

Check warning on line 254 in pkg/local_object_storage/metabase/metadata.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/metadata.go#L252-L254

Added lines #L252 - L254 were not covered by tests
} else {
if _, _, err = splitValOID(b[res.ValIDOff:]); err != nil {
return nil, fmt.Errorf("invalid VAL_OID: %w", err)
}
}
res.Key = b
return &res, nil
Expand Down Expand Up @@ -299,21 +332,28 @@ func (db *DB) searchTx(tx *bbolt.Tx, cnr cid.ID, fs object.SearchFilters, fInt m
}
primSeekPrefix = primSeekKey[:cursor.ValIDOff]
valID := cursor.Key[cursor.ValIDOff:]
prevResPrimVal, prevResOID = valID[:len(valID)-oid.Size], valID[len(valID)-oid.Size:]
if intPrimMatcher {
prevResPrimVal, prevResOID = valID[:len(valID)-oid.Size], valID[len(valID)-oid.Size:]
} else {
var err error
if prevResPrimVal, prevResOID, err = splitValOID(valID); err != nil {
return nil, nil, fmt.Errorf("invalid VAL_OID: %w", err)
}

Check warning on line 341 in pkg/local_object_storage/metabase/metadata.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/metadata.go#L340-L341

Added lines #L340 - L341 were not covered by tests
}
} else {
if intPrimMatcher {
// we seek 0x01_ATTR_DELIM_VAL either w/ or w/o VAL. We ignore VAL when we need
// to start from the lowest int, i.e. filter is auto-matched (e.g. <=MaxUint256) or <(=).
f := fInt[0]
withVal := !f.auto && (primMatcher == object.MatchNumGT || primMatcher == object.MatchNumGE)
kln := 1 + len(primAttr) + utf8DelimiterLen // prefix 1st
kln := 1 + len(primAttr) + attributeDelimiterLen // prefix 1st
if withVal {
kln += intValLen
}
primSeekKey = make([]byte, kln)
primSeekKey[0] = metaPrefixAttrIDInt
off := 1 + copy(primSeekKey[1:], primAttr)
off += copy(primSeekKey[off:], utf8Delimiter)
off += copy(primSeekKey[off:], attributeDelimiter)
primSeekPrefix = primSeekKey[:off]
if withVal {
copy(primSeekKey[off:], f.b)
Expand Down Expand Up @@ -358,10 +398,18 @@ nextPrimKey:
}
} else { // apply primary filter
valID := primKey[len(primSeekPrefix):] // VAL_OID
if len(valID) <= oid.Size {
return nil, nil, invalidMetaBucketKeyErr(primKey, fmt.Errorf("too small VAL_OID len %d", len(valID)))
if intPrimMatcher {
if len(valID) <= oid.Size {
return nil, nil, invalidMetaBucketKeyErr(primKey, fmt.Errorf("too small VAL_OID len %d", len(valID)))
}

Check warning on line 404 in pkg/local_object_storage/metabase/metadata.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/metadata.go#L403-L404

Added lines #L403 - L404 were not covered by tests
primDBVal, id = valID[:len(valID)-oid.Size], valID[len(valID)-oid.Size:]
} else {
var err error
if primDBVal, id, err = splitValOID(valID); err != nil {
fmt.Println(string(valID))
return nil, nil, invalidMetaBucketKeyErr(primKey, fmt.Errorf("invalid VAL_OID: %w", err))
}

Check warning on line 411 in pkg/local_object_storage/metabase/metadata.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/metadata.go#L409-L411

Added lines #L409 - L411 were not covered by tests
}
primDBVal, id = valID[:len(valID)-oid.Size], valID[len(valID)-oid.Size:]
for i := range fs {
// there may be several filters by primary key, e.g. N >= 10 && N <= 20. We
// check them immediately before moving through the DB.
Expand Down Expand Up @@ -569,10 +617,10 @@ func (db *DB) searchUnfiltered(cnr cid.ID, cursor *SearchCursor, count uint16) (
}

func seekKeyForAttribute(attr, fltVal string) ([]byte, []byte, error) {
key := make([]byte, 1+len(attr)+utf8DelimiterLen+len(fltVal)) // prefix 1st
key := make([]byte, 1+len(attr)+attributeDelimiterLen+len(fltVal)) // prefix 1st
key[0] = metaPrefixAttrIDPlain
off := 1 + copy(key[1:], attr)
off += copy(key[off:], utf8Delimiter)
off += copy(key[off:], attributeDelimiter)
prefix := key[:off]
if fltVal == "" {
return key, prefix, nil
Expand Down Expand Up @@ -759,16 +807,23 @@ func intWithinLimits(n *big.Int) bool { return n.Cmp(maxUint256Neg) >= 0 && n.Cm
// makes PREFIX_ATTR_DELIM_VAL_OID with unset VAL space, and returns offset of
// the VAL. Reuses previously allocated buffer if it is sufficient.
func prepareMetaAttrIDKey(buf *keyBuffer, id oid.ID, attr string, valLen int, intAttr bool) ([]byte, int) {
k := buf.alloc(attrIDFixedLen + len(attr) + valLen)
kln := attrIDFixedLen + len(attr) + valLen
if !intAttr {
kln += attributeDelimiterLen
}
k := buf.alloc(kln)
if intAttr {
k[0] = metaPrefixAttrIDInt
} else {
k[0] = metaPrefixAttrIDPlain
}
off := 1 + copy(k[1:], attr)
off += copy(k[off:], utf8Delimiter)
off += copy(k[off:], attributeDelimiter)
valOff := off
off += valLen
if !intAttr {
off += copy(k[off:], attributeDelimiter)
}
copy(k[off:], id[:])
return k, valOff
}
Expand All @@ -779,7 +834,7 @@ func prepareMetaIDAttrKey(buf *keyBuffer, id oid.ID, attr string, valLen int) []
k[0] = metaPrefixIDAttr
off := 1 + copy(k[1:], id[:])
off += copy(k[off:], attr)
copy(k[off:], utf8Delimiter)
copy(k[off:], attributeDelimiter)
return k
}

Expand Down Expand Up @@ -817,7 +872,7 @@ func (x *metaAttributeSeeker) get(id []byte, attr string) ([]byte, error) {
pref[0] = metaPrefixIDAttr
off := 1 + copy(pref[1:], id)
off += copy(pref[off:], attr)
copy(pref[off:], utf8Delimiter)
copy(pref[off:], attributeDelimiter)
if x.crsr == nil {
x.crsr = x.bkt.Cursor()
}
Expand Down Expand Up @@ -874,9 +929,9 @@ func CalculateCursor(fs object.SearchFilters, lastItem client.SearchResultItem)
default:
if n, ok := new(big.Int).SetString(lastItemVal, 10); ok {
var res SearchCursor
res.Key = make([]byte, len(attr)+utf8DelimiterLen+intValLen+oid.Size)
res.Key = make([]byte, len(attr)+attributeDelimiterLen+intValLen+oid.Size)

Check warning on line 932 in pkg/local_object_storage/metabase/metadata.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/metadata.go#L932

Added line #L932 was not covered by tests
off := copy(res.Key, attr)
res.ValIDOff = off + copy(res.Key[off:], utf8Delimiter)
res.ValIDOff = off + copy(res.Key[off:], attributeDelimiter)

Check warning on line 934 in pkg/local_object_storage/metabase/metadata.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/metadata.go#L934

Added line #L934 was not covered by tests
putInt(res.Key[res.ValIDOff:res.ValIDOff+intValLen], n)
copy(res.Key[res.ValIDOff+intValLen:], lastItem.ID[:])
return res, nil
Expand All @@ -892,9 +947,9 @@ func CalculateCursor(fs object.SearchFilters, lastItem client.SearchResultItem)
return SearchCursor{}, fmt.Errorf("wrong %q attribute decoded len %d", attr, ln)
}
var res SearchCursor
res.Key = make([]byte, len(attr)+utf8DelimiterLen+ln+oid.Size)
res.Key = make([]byte, len(attr)+attributeDelimiterLen+ln+oid.Size)

Check warning on line 950 in pkg/local_object_storage/metabase/metadata.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/metadata.go#L950

Added line #L950 was not covered by tests
off := copy(res.Key, attr)
res.ValIDOff = off + copy(res.Key[off:], utf8Delimiter)
res.ValIDOff = off + copy(res.Key[off:], attributeDelimiter)

Check warning on line 952 in pkg/local_object_storage/metabase/metadata.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/metadata.go#L952

Added line #L952 was not covered by tests
var err error
if _, err = hex.Decode(res.Key[res.ValIDOff:], []byte(lastItemVal)); err != nil {
return SearchCursor{}, fmt.Errorf("decode %q attribute from HEX: %w", attr, err)
Expand All @@ -913,9 +968,9 @@ func CalculateCursor(fs object.SearchFilters, lastItem client.SearchResultItem)
val = []byte(lastItemVal)
}
var res SearchCursor
res.Key = make([]byte, len(attr)+utf8DelimiterLen+len(val)+oid.Size)
res.Key = make([]byte, len(attr)+attributeDelimiterLen+len(val)+oid.Size)

Check warning on line 971 in pkg/local_object_storage/metabase/metadata.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/metadata.go#L971

Added line #L971 was not covered by tests
off := copy(res.Key, attr)
res.ValIDOff = off + copy(res.Key[off:], utf8Delimiter)
res.ValIDOff = off + copy(res.Key[off:], attributeDelimiter)

Check warning on line 973 in pkg/local_object_storage/metabase/metadata.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/metadata.go#L973

Added line #L973 was not covered by tests
off = res.ValIDOff + copy(res.Key[res.ValIDOff:], val)
copy(res.Key[off:], lastItem.ID[:])
return res, nil
Expand Down
Loading

0 comments on commit 5d8fa17

Please sign in to comment.