Skip to content

Commit

Permalink
node/meta: initial MPT meta storage
Browse files Browse the repository at this point in the history
Information from FS chain is placed inside MPT structures reused from neo-go.
Only the latest state is stored. Every container has its own MPT storage based
on a separate bolt KV database. Refs #3070.

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
  • Loading branch information
carpawell committed Feb 14, 2025
1 parent eb90734 commit 2e28ada
Show file tree
Hide file tree
Showing 5 changed files with 1,383 additions and 1 deletion.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
github.com/stretchr/testify v1.9.0
go.etcd.io/bbolt v1.3.11
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20240823005443-9b4947da3948
golang.org/x/net v0.28.0
golang.org/x/sync v0.10.0
golang.org/x/sys v0.28.0
Expand Down Expand Up @@ -93,7 +94,6 @@ require (
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
Expand Down
160 changes: 160 additions & 0 deletions pkg/services/meta/container.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package meta

import (
"bytes"
"errors"
"fmt"
"maps"
"os"
"path"
"sync"

"github.com/nspcc-dev/neo-go/pkg/core/mpt"
"github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/core/storage/dbconfig"
"github.com/nspcc-dev/neo-go/pkg/util"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
objectsdk "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)

type containerStorage struct {
m sync.RWMutex
opsBatch map[string][]byte

path string
mpt *mpt.Trie
db storage.Store
}

func (s *containerStorage) drop() error {
s.m.Lock()
defer s.m.Unlock()

err := s.db.Close()
if err != nil {
return fmt.Errorf("close container storage: %w", err)
}

err = os.RemoveAll(s.path)
if err != nil {
return fmt.Errorf("remove container storage: %w", err)
}

return nil
}

func (s *containerStorage) putObject(e objEvent) error {
s.m.Lock()
defer s.m.Unlock()

newKVs := make(map[string][]byte)
commsuffix := e.oID[:]

// batching that is implemented for MPT ignores key's first byte

newKVs[string(append([]byte{0, oidIndex}, commsuffix...))] = []byte{}
newKVs[string(append([]byte{0, sizeIndex}, commsuffix...))] = e.size.Bytes()
if len(e.firstObject) > 0 {
newKVs[string(append([]byte{0, firstPartIndex}, commsuffix...))] = e.firstObject
}
if len(e.prevObject) > 0 {
newKVs[string(append([]byte{0, previousPartIndex}, commsuffix...))] = e.prevObject
}
if len(e.deletedObjects) > 0 {
newKVs[string(append([]byte{0, deletedIndex}, commsuffix...))] = e.deletedObjects
maps.Copy(newKVs, deleteObjectsOps(s.db, e.deletedObjects))
}
if len(e.lockedObjects) > 0 {
newKVs[string(append([]byte{0, lockedIndex}, commsuffix...))] = e.lockedObjects
}
if e.typ != objectsdk.TypeRegular {
newKVs[string(append([]byte{0, typeIndex}, commsuffix...))] = []byte{byte(e.typ)}
}

if s.opsBatch == nil {
s.opsBatch = make(map[string][]byte)
}
maps.Copy(s.opsBatch, newKVs)

err := s.db.PutChangeSet(mptToStoreBatch(newKVs), nil)
if err != nil {
return fmt.Errorf("put MPT KVs to the raw storage manually: %w", err)
}

// TODO: receive full object header and store its non-MPT parts in
// persistent object storage

return nil
}

func deleteObjectsOps(s storage.Store, objects []byte) map[string][]byte {
resMpt := make(map[string][]byte)
rng := storage.SeekRange{}

for len(objects) > 0 {
o := objects[:oid.Size]
objects = objects[oid.Size:]
rng.Start = append([]byte{oidIndex}, o...)
stopKey := lastObjectKey(o)

s.Seek(rng, func(k, v []byte) bool {
if len(k) >= 1+oid.Size && bytes.HasPrefix(k[1:], o) {
resMpt[string(append([]byte{0}, k...))] = nil // nil means "delete"
return true
}

return bytes.Compare(k, stopKey) < 0
})
}

return resMpt
}

// lastObjectKey returns the least possible key in sorted DB list that
// proves there will not be information about the object anymore.
func lastObjectKey(rawOID []byte) []byte {
res := make([]byte, 0, len(rawOID)+1)
res = append(res, lastEnumIndex-1)

return append(res, rawOID...)
}

// mptToStoreBatch drops the first byte from every key in the map.
func mptToStoreBatch(b map[string][]byte) map[string][]byte {
res := make(map[string][]byte, len(b))
for k, v := range b {
res[k[1:]] = v
}

return res
}

func storageForContainer(rootPath string, cID cid.ID) (*containerStorage, error) {
p := path.Join(rootPath, cID.EncodeToString())

st, err := storage.NewBoltDBStore(dbconfig.BoltDBOptions{FilePath: p, ReadOnly: false})
if err != nil {
return nil, fmt.Errorf("open bolt store at %q: %w", p, err)
}

var prevRootNode mpt.Node
root, err := st.Get([]byte{rootKey})
if !errors.Is(err, storage.ErrKeyNotFound) {
if err != nil {
return nil, fmt.Errorf("get state root from db: %w", err)
}

if len(root) != util.Uint256Size {
return nil, fmt.Errorf("root hash from db is %d bytes long, expect %d", len(root), util.Uint256Size)
}

prevRootNode = mpt.NewHashNode([util.Uint256Size]byte(root))
}

return &containerStorage{
path: p,
mpt: mpt.NewTrie(prevRootNode, mpt.ModeLatest, storage.NewMemCachedStore(st)),
db: st,
}, nil
}
Loading

0 comments on commit 2e28ada

Please sign in to comment.