Skip to content

Commit

Permalink
node/metabase: Prevent value disorder in plain attribute index
Browse files Browse the repository at this point in the history
Previously, since attributes were stored in PREFIX_KEY_DELIM_VAL_OID
format, the lexicographic order of values could be violated. For
example, if two objects had the same attribute with values '1' and '1a',
and the 1st one had OID starting from byte > 'a', they were stored in
the meta bucket in reverse order. Keeping natural order is essential for
the DB iterator's effectiveness for sorted SearchV2.

This extends storage scheme with zero byte delimiter
b/w VAL and OID. It can be a delimiter since none VAL can include it. At
the same time, it preserves the order and resolves mentioned problem.

The original separator was 0xFF as an invalid UTF-8 the 0xFF byte was
banned. In order not to have two different separators, KEY and VAL are
now also separated by 0x00. Any attempt to write metadata with a zero
byte in the attributes will now fail. Although with
6f9647e such an object cannot come from
the upper-level object service, relying only on it is risky: migration
already works without it, and the metadata limits are critical to the
SearchV2 provision.

Refs #3058.

Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
  • Loading branch information
cthulhu-rider committed Feb 22, 2025
1 parent 6f9647e commit 8f593f9
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 65 deletions.
6 changes: 3 additions & 3 deletions pkg/local_object_storage/metabase/VERSION.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,10 @@ The lowest not used bucket index: 20.
- Name: `255` + container ID
- Keys without values
- `0` + object ID
- `1` + attribute + `0xFF` + `0|1` + fixed256(value) + object ID: integer attributes. \
- `1` + attribute + `0x00` + `0|1` + fixed256(value) + object ID: integer attributes. \
Sign byte is 0 for negatives, 1 otherwise. Bits are inverted for negatives also.
- `2` + attribute + `0xFF` + value + object ID
- `3` + object ID + attribute + `0xFF` + value
- `2` + attribute + `0x00` + value + `0x00` + object ID
- `3` + object ID + attribute + `0x00` + value

# History

Expand Down
121 changes: 88 additions & 33 deletions pkg/local_object_storage/metabase/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"math/big"
"slices"
"strconv"
"strings"

"github.com/google/uuid"
"github.com/mr-tron/base58"
Expand All @@ -32,8 +33,8 @@ const (
)

const (
intValLen = 33 // prefix byte for sign + fixed256 in metaPrefixAttrIDInt
attrIDFixedLen = 1 + oid.Size + utf8DelimiterLen // prefix first
intValLen = 33 // prefix byte for sign + fixed256 in metaPrefixAttrIDInt
attrIDFixedLen = 1 + oid.Size + attributeDelimiterLen // prefix first
)

const binPropMarker = "1" // ROOT, PHY, etc.
Expand Down Expand Up @@ -72,6 +73,15 @@ func putMetadataForObject(tx *bbolt.Tx, hdr object.Object, hasParent, phy bool)
func putMetadata(tx *bbolt.Tx, cnr cid.ID, id oid.ID, ver version.Version, owner user.ID, typ object.Type, creationEpoch uint64,
payloadLen uint64, pldHash, pldHmmHash, splitID []byte, parentID, firstID oid.ID, attrs []object.Attribute,
hasParent, phy bool) error {
for i := range attrs {
if strings.IndexByte(attrs[i].Key(), attributeDelimiter[0]) >= 0 {
return fmt.Errorf("attribute #%d key contains 0x%02X byte used in sep", i, attributeDelimiter[0])
}
if strings.IndexByte(attrs[i].Value(), attributeDelimiter[0]) >= 0 {
return fmt.Errorf("attribute #%d value contains 0x%02X byte used in sep", i, attributeDelimiter[0])
}
}

metaBkt, err := tx.CreateBucketIfNotExists(metaBucketKey(cnr))
if err != nil {
return fmt.Errorf("create meta bucket for container: %w", err)
Expand Down Expand Up @@ -158,20 +168,21 @@ func deleteMetadata(tx *bbolt.Tx, cnr cid.ID, id oid.ID) error {
pref[0] = metaPrefixIDAttr
c := metaBkt.Cursor()
for kIDAttr, _ := c.Seek(pref); bytes.HasPrefix(kIDAttr, pref); kIDAttr, _ = c.Next() {
sepInd := bytes.LastIndex(kIDAttr, utf8Delimiter)
sepInd := bytes.LastIndex(kIDAttr, attributeDelimiter)
if sepInd < 0 {
return fmt.Errorf("invalid key with prefix 0x%X in meta bucket: missing delimiter", kIDAttr[0])
}
kAttrID := slices.Clone(kIDAttr)
kAttrID := make([]byte, len(kIDAttr)+attributeDelimiterLen)
kAttrID[0] = metaPrefixAttrIDPlain
copy(kAttrID[1:], kIDAttr[1+oid.Size:])
copy(kAttrID[len(kAttrID)-oid.Size:], id[:])
off := 1 + copy(kAttrID[1:], kIDAttr[1+oid.Size:])
off += copy(kAttrID[off:], attributeDelimiter)
copy(kAttrID[off:], id[:])
ks = append(ks, kIDAttr, kAttrID)
if n, ok := new(big.Int).SetString(string(kIDAttr[sepInd+utf8DelimiterLen:]), 10); ok && intWithinLimits(n) {
kAttrIDInt := make([]byte, sepInd+utf8DelimiterLen+intValLen)
if n, ok := new(big.Int).SetString(string(kIDAttr[sepInd+attributeDelimiterLen:]), 10); ok && intWithinLimits(n) {
kAttrIDInt := make([]byte, sepInd+attributeDelimiterLen+intValLen)
kAttrIDInt[0] = metaPrefixAttrIDInt
off := 1 + copy(kAttrIDInt[1:], kIDAttr[1+oid.Size:sepInd])
off += copy(kAttrIDInt[off:], utf8Delimiter)
off += copy(kAttrIDInt[off:], attributeDelimiter)
putInt(kAttrIDInt[off:off+intValLen], n)
copy(kAttrIDInt[off+intValLen:], id[:])
ks = append(ks, kAttrIDInt)
Expand All @@ -191,9 +202,22 @@ type SearchCursor struct {
ValIDOff int
}

// splits VAL_DELIM_OID.
func splitValOID(b []byte) ([]byte, []byte, error) {
if len(b) < attributeDelimiterLen+oid.Size+1 { // +1 because VAL cannot be empty
return nil, nil, fmt.Errorf("too short len %d", len(b))
}
idOff := len(b) - oid.Size
valLn := idOff - attributeDelimiterLen
if !bytes.Equal(b[valLn:idOff], attributeDelimiter) {
return nil, nil, errors.New("no delimiter before OID")
}

Check warning on line 214 in pkg/local_object_storage/metabase/metadata.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/metadata.go#L213-L214

Added lines #L213 - L214 were not covered by tests
return b[:valLn], b[idOff:], nil
}

// NewSearchCursorFromString decodes cursor from the string according to the
// primary attribute.
func NewSearchCursorFromString(s, primAttr string) (*SearchCursor, error) {
func NewSearchCursorFromString(s, primAttr string, isInt bool) (*SearchCursor, error) {
if s == "" {
return nil, nil
}
Expand All @@ -212,17 +236,23 @@ func NewSearchCursorFromString(s, primAttr string) (*SearchCursor, error) {
if n > object.MaxHeaderLen {
return nil, fmt.Errorf("cursor len %d exceeds the limit %d", n, object.MaxHeaderLen)
}
ind := bytes.Index(b[1:], utf8Delimiter) // 1st is prefix
ind := bytes.Index(b[1:], attributeDelimiter) // 1st is prefix
if ind < 0 {
return nil, errors.New("missing delimiter")
}
if !bytes.Equal(b[1:1+ind], []byte(primAttr)) {
return nil, errors.New("wrong attribute")
}
var res SearchCursor
res.ValIDOff = 1 + len(primAttr) + len(utf8Delimiter)
if len(b[res.ValIDOff:]) <= oid.Size {
return nil, errors.New("missing value")
res.ValIDOff = 1 + len(primAttr) + len(attributeDelimiter)
if isInt {
if len(b[res.ValIDOff:]) <= oid.Size {
return nil, errors.New("missing value")
}

Check warning on line 251 in pkg/local_object_storage/metabase/metadata.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/metadata.go#L249-L251

Added lines #L249 - L251 were not covered by tests
} else {
if _, _, err = splitValOID(b[res.ValIDOff:]); err != nil {
return nil, fmt.Errorf("invalid VAL_OID: %w", err)
}
}
res.Key = b
return &res, nil
Expand Down Expand Up @@ -299,21 +329,28 @@ func (db *DB) searchTx(tx *bbolt.Tx, cnr cid.ID, fs object.SearchFilters, fInt m
}
primSeekPrefix = primSeekKey[:cursor.ValIDOff]
valID := cursor.Key[cursor.ValIDOff:]
prevResPrimVal, prevResOID = valID[:len(valID)-oid.Size], valID[len(valID)-oid.Size:]
if intPrimMatcher {
prevResPrimVal, prevResOID = valID[:len(valID)-oid.Size], valID[len(valID)-oid.Size:]
} else {
var err error
if prevResPrimVal, prevResOID, err = splitValOID(valID); err != nil {
return nil, nil, fmt.Errorf("invalid VAL_OID: %w", err)
}

Check warning on line 338 in pkg/local_object_storage/metabase/metadata.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/metadata.go#L337-L338

Added lines #L337 - L338 were not covered by tests
}
} else {
if intPrimMatcher {
// we seek 0x01_ATTR_DELIM_VAL either w/ or w/o VAL. We ignore VAL when we need
// to start from the lowest int, i.e. filter is auto-matched (e.g. <=MaxUint256) or <(=).
f := fInt[0]
withVal := !f.auto && (primMatcher == object.MatchNumGT || primMatcher == object.MatchNumGE)
kln := 1 + len(primAttr) + utf8DelimiterLen // prefix 1st
kln := 1 + len(primAttr) + attributeDelimiterLen // prefix 1st
if withVal {
kln += intValLen
}
primSeekKey = make([]byte, kln)
primSeekKey[0] = metaPrefixAttrIDInt
off := 1 + copy(primSeekKey[1:], primAttr)
off += copy(primSeekKey[off:], utf8Delimiter)
off += copy(primSeekKey[off:], attributeDelimiter)
primSeekPrefix = primSeekKey[:off]
if withVal {
copy(primSeekKey[off:], f.b)
Expand Down Expand Up @@ -358,10 +395,17 @@ nextPrimKey:
}
} else { // apply primary filter
valID := primKey[len(primSeekPrefix):] // VAL_OID
if len(valID) <= oid.Size {
return nil, nil, invalidMetaBucketKeyErr(primKey, fmt.Errorf("too small VAL_OID len %d", len(valID)))
if intPrimMatcher {
if len(valID) <= oid.Size {
return nil, nil, invalidMetaBucketKeyErr(primKey, fmt.Errorf("too small VAL_OID len %d", len(valID)))
}

Check warning on line 401 in pkg/local_object_storage/metabase/metadata.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/metadata.go#L400-L401

Added lines #L400 - L401 were not covered by tests
primDBVal, id = valID[:len(valID)-oid.Size], valID[len(valID)-oid.Size:]
} else {
var err error
if primDBVal, id, err = splitValOID(valID); err != nil {
return nil, nil, invalidMetaBucketKeyErr(primKey, fmt.Errorf("invalid VAL_OID: %w", err))
}

Check warning on line 407 in pkg/local_object_storage/metabase/metadata.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/metadata.go#L406-L407

Added lines #L406 - L407 were not covered by tests
}
primDBVal, id = valID[:len(valID)-oid.Size], valID[len(valID)-oid.Size:]
for i := range fs {
// there may be several filters by primary key, e.g. N >= 10 && N <= 20. We
// check them immediately before moving through the DB.
Expand Down Expand Up @@ -569,10 +613,10 @@ func (db *DB) searchUnfiltered(cnr cid.ID, cursor *SearchCursor, count uint16) (
}

func seekKeyForAttribute(attr, fltVal string) ([]byte, []byte, error) {
key := make([]byte, 1+len(attr)+utf8DelimiterLen+len(fltVal)) // prefix 1st
key := make([]byte, 1+len(attr)+attributeDelimiterLen+len(fltVal)) // prefix 1st
key[0] = metaPrefixAttrIDPlain
off := 1 + copy(key[1:], attr)
off += copy(key[off:], utf8Delimiter)
off += copy(key[off:], attributeDelimiter)
prefix := key[:off]
if fltVal == "" {
return key, prefix, nil
Expand Down Expand Up @@ -759,16 +803,23 @@ func intWithinLimits(n *big.Int) bool { return n.Cmp(maxUint256Neg) >= 0 && n.Cm
// makes PREFIX_ATTR_DELIM_VAL_OID with unset VAL space, and returns offset of
// the VAL. Reuses previously allocated buffer if it is sufficient.
func prepareMetaAttrIDKey(buf *keyBuffer, id oid.ID, attr string, valLen int, intAttr bool) ([]byte, int) {
k := buf.alloc(attrIDFixedLen + len(attr) + valLen)
kln := attrIDFixedLen + len(attr) + valLen
if !intAttr {
kln += attributeDelimiterLen
}
k := buf.alloc(kln)
if intAttr {
k[0] = metaPrefixAttrIDInt
} else {
k[0] = metaPrefixAttrIDPlain
}
off := 1 + copy(k[1:], attr)
off += copy(k[off:], utf8Delimiter)
off += copy(k[off:], attributeDelimiter)
valOff := off
off += valLen
if !intAttr {
off += copy(k[off:], attributeDelimiter)
}
copy(k[off:], id[:])
return k, valOff
}
Expand All @@ -779,7 +830,7 @@ func prepareMetaIDAttrKey(buf *keyBuffer, id oid.ID, attr string, valLen int) []
k[0] = metaPrefixIDAttr
off := 1 + copy(k[1:], id[:])
off += copy(k[off:], attr)
copy(k[off:], utf8Delimiter)
copy(k[off:], attributeDelimiter)
return k
}

Expand Down Expand Up @@ -817,7 +868,7 @@ func (x *metaAttributeSeeker) get(id []byte, attr string) ([]byte, error) {
pref[0] = metaPrefixIDAttr
off := 1 + copy(pref[1:], id)
off += copy(pref[off:], attr)
copy(pref[off:], utf8Delimiter)
copy(pref[off:], attributeDelimiter)
if x.crsr == nil {
x.crsr = x.bkt.Cursor()
}
Expand Down Expand Up @@ -874,9 +925,9 @@ func CalculateCursor(fs object.SearchFilters, lastItem client.SearchResultItem)
default:
if n, ok := new(big.Int).SetString(lastItemVal, 10); ok {
var res SearchCursor
res.Key = make([]byte, len(attr)+utf8DelimiterLen+intValLen+oid.Size)
res.Key = make([]byte, len(attr)+attributeDelimiterLen+intValLen+oid.Size)

Check warning on line 928 in pkg/local_object_storage/metabase/metadata.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/metadata.go#L928

Added line #L928 was not covered by tests
off := copy(res.Key, attr)
res.ValIDOff = off + copy(res.Key[off:], utf8Delimiter)
res.ValIDOff = off + copy(res.Key[off:], attributeDelimiter)

Check warning on line 930 in pkg/local_object_storage/metabase/metadata.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/metadata.go#L930

Added line #L930 was not covered by tests
putInt(res.Key[res.ValIDOff:res.ValIDOff+intValLen], n)
copy(res.Key[res.ValIDOff+intValLen:], lastItem.ID[:])
return res, nil
Expand All @@ -892,14 +943,16 @@ func CalculateCursor(fs object.SearchFilters, lastItem client.SearchResultItem)
return SearchCursor{}, fmt.Errorf("wrong %q attribute decoded len %d", attr, ln)
}
var res SearchCursor
res.Key = make([]byte, len(attr)+utf8DelimiterLen+ln+oid.Size)
res.Key = make([]byte, len(attr)+attributeDelimiterLen+ln+attributeDelimiterLen+oid.Size)

Check warning on line 946 in pkg/local_object_storage/metabase/metadata.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/metadata.go#L946

Added line #L946 was not covered by tests
off := copy(res.Key, attr)
res.ValIDOff = off + copy(res.Key[off:], utf8Delimiter)
res.ValIDOff = off + copy(res.Key[off:], attributeDelimiter)

Check warning on line 948 in pkg/local_object_storage/metabase/metadata.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/metadata.go#L948

Added line #L948 was not covered by tests
var err error
if _, err = hex.Decode(res.Key[res.ValIDOff:], []byte(lastItemVal)); err != nil {
return SearchCursor{}, fmt.Errorf("decode %q attribute from HEX: %w", attr, err)
}
copy(res.Key[res.ValIDOff+ln:], lastItem.ID[:])
off = res.ValIDOff + ln
off += copy(res.Key[off:], attributeDelimiter)
copy(res.Key[off:], lastItem.ID[:])

Check warning on line 955 in pkg/local_object_storage/metabase/metadata.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/metadata.go#L953-L955

Added lines #L953 - L955 were not covered by tests
return res, nil
case object.FilterSplitID:
uid, err := uuid.Parse(lastItemVal)
Expand All @@ -913,10 +966,12 @@ func CalculateCursor(fs object.SearchFilters, lastItem client.SearchResultItem)
val = []byte(lastItemVal)
}
var res SearchCursor
res.Key = make([]byte, len(attr)+utf8DelimiterLen+len(val)+oid.Size)
kln := len(attr) + attributeDelimiterLen + len(val) + attributeDelimiterLen + oid.Size
res.Key = make([]byte, kln)

Check warning on line 970 in pkg/local_object_storage/metabase/metadata.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/metadata.go#L969-L970

Added lines #L969 - L970 were not covered by tests
off := copy(res.Key, attr)
res.ValIDOff = off + copy(res.Key[off:], utf8Delimiter)
res.ValIDOff = off + copy(res.Key[off:], attributeDelimiter)

Check warning on line 972 in pkg/local_object_storage/metabase/metadata.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/metadata.go#L972

Added line #L972 was not covered by tests
off = res.ValIDOff + copy(res.Key[res.ValIDOff:], val)
off += copy(res.Key[off:], attributeDelimiter)

Check warning on line 974 in pkg/local_object_storage/metabase/metadata.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/metadata.go#L974

Added line #L974 was not covered by tests
copy(res.Key[off:], lastItem.ID[:])
return res, nil
}
Expand Down
Loading

0 comments on commit 8f593f9

Please sign in to comment.