Skip to content

Commit

Permalink
node/meta: initial MPT meta storage
Browse files Browse the repository at this point in the history
Information from FS chain is placed inside MPT structures reused from neo-go.
Only the latest state is stored. Every container has its own MPT storage based
on a separate bolt KV database. Refs #3070.

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
  • Loading branch information
carpawell committed Feb 12, 2025
1 parent eb90734 commit 1f15dae
Show file tree
Hide file tree
Showing 5 changed files with 1,232 additions and 1 deletion.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
github.com/stretchr/testify v1.9.0
go.etcd.io/bbolt v1.3.11
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20240823005443-9b4947da3948
golang.org/x/net v0.28.0
golang.org/x/sync v0.10.0
golang.org/x/sys v0.28.0
Expand Down Expand Up @@ -93,7 +94,6 @@ require (
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
Expand Down
126 changes: 126 additions & 0 deletions pkg/services/meta/container.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package meta

import (
"errors"
"fmt"
"maps"
"os"
"path"
"sync"

"github.com/nspcc-dev/neo-go/pkg/core/mpt"
"github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/core/storage/dbconfig"
"github.com/nspcc-dev/neo-go/pkg/util"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
objectsdk "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)

type containerStorage struct {
m sync.RWMutex
opsBatch map[string][]byte

path string
mpt *mpt.Trie
db storage.Store
}

func (s *containerStorage) drop() error {
s.m.Lock()
defer s.m.Unlock()

err := s.db.Close()
if err != nil {
return fmt.Errorf("close container storage: %w", err)
}

err = os.RemoveAll(s.path)
if err != nil {
return fmt.Errorf("remove container storage: %w", err)
}

return nil
}

func (s *containerStorage) putObject(e objEvent) error {
s.m.Lock()
defer s.m.Unlock()

newKVs := make(map[string][]byte)
commsuffix := e.oID[:]

// batching that is implemented for MPT ignores key's first byte

newKVs[string(append([]byte{0, oidIndex}, commsuffix...))] = []byte{}
newKVs[string(append([]byte{0, sizeIndex}, commsuffix...))] = e.size.Bytes()
if len(e.firstObject) == oid.Size {
newKVs[string(append([]byte{0, firstPartIndex}, commsuffix...))] = e.firstObject
}
if len(e.prevObject) == oid.Size {
newKVs[string(append([]byte{0, previousPartIndex}, commsuffix...))] = e.prevObject
}
if len(e.deletedObjects) > 0 {
newKVs[string(append([]byte{0, deletedIndex}, commsuffix...))] = e.deletedObjects
}
if len(e.lockedObjects) > 0 {
newKVs[string(append([]byte{0, lockedIndex}, commsuffix...))] = e.lockedObjects
}
if e.typ != objectsdk.TypeRegular {
newKVs[string(append([]byte{0, typeIndex}, commsuffix...))] = []byte{byte(e.typ)}
}

if s.opsBatch == nil {
s.opsBatch = make(map[string][]byte)
}
maps.Copy(s.opsBatch, newKVs)

err := s.db.PutChangeSet(mptToStoreBatch(newKVs), nil)
if err != nil {
return fmt.Errorf("put MPT KVs to the raw storage manually: %w", err)
}

// TODO: receive full object header and store its non-MPT parts in
// persistent object storage

return nil
}

// mptToStoreBatch drops the first byte from every key in the map.
func mptToStoreBatch(b map[string][]byte) map[string][]byte {
res := make(map[string][]byte, len(b))
for k, v := range b {
res[k[1:]] = v
}

return res
}

func storageForContainer(rootPath string, cID cid.ID) (*containerStorage, error) {
p := path.Join(rootPath, cID.EncodeToString())

st, err := storage.NewBoltDBStore(dbconfig.BoltDBOptions{FilePath: p, ReadOnly: false})
if err != nil {
return nil, fmt.Errorf("open bolt store at %q: %w", p, err)
}

var prevRootNode mpt.Node
root, err := st.Get([]byte{rootKey})
if !errors.Is(err, storage.ErrKeyNotFound) {
if err != nil {
return nil, fmt.Errorf("get state root from db: %w", err)
}

if len(root) != util.Uint256Size {
return nil, fmt.Errorf("root hash from db is %d bytes long, expect %d", len(root), util.Uint256Size)
}

prevRootNode = mpt.NewHashNode([util.Uint256Size]byte(root))
}

return &containerStorage{
path: p,
mpt: mpt.NewTrie(prevRootNode, mpt.ModeLatest, storage.NewMemCachedStore(st)),
db: st,
}, nil
}
223 changes: 223 additions & 0 deletions pkg/services/meta/meta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
package meta

import (
"context"
"fmt"
"os"
"sync"
"time"

"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/util"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"go.uber.org/zap"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
)

const (
// raw storage prefixes.

// rootKey is the key for the last known state root in KV data base
// associated with MPT.
rootKey = 0x00
)

// ContainerLister is a source of actual containers current node belongs to.
type ContainerLister interface {
// List returns node's containers that support chain-based meta data and
// any error that does not allow listing.
List() (map[cid.ID]struct{}, error)
}

// Meta handles object meta information received from FS chain and object
// storages. Chain information is stored in Merkle-Patricia Tries. Full objects
// index is built and stored as a simple KV storage.
type Meta struct {
l *zap.Logger
rootPath string
netmapH util.Uint160
cnrH util.Uint160
cLister ContainerLister

m sync.RWMutex
storages map[cid.ID]*containerStorage

endpoints []string
timeout time.Duration
magicNumber uint32
cliCtx context.Context // for client context only, as it is required by the lib
ws *rpcclient.WSClient
bCh chan *block.Header
objEv chan *state.ContainedNotificationEvent
cnrDelEv chan *state.ContainedNotificationEvent
cnrPutEv chan *state.ContainedNotificationEvent
epochEv chan *state.ContainedNotificationEvent

objNotificationBuff chan *state.ContainedNotificationEvent
}

const objsBufferSize = 1024

// New makes [Meta].
func New(l *zap.Logger, cLister ContainerLister, timeout time.Duration, endpoints []string, containerH, nmH util.Uint160, rootPath string) (*Meta, error) {
storagesFS, err := os.ReadDir(rootPath)
if err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("read existing container storages: %w", err)
}
storagesRead := make(map[cid.ID]*containerStorage)
for _, s := range storagesFS {
sName := s.Name()
cID, err := cid.DecodeString(sName)
if err != nil {
l.Warn("skip unknown container storage entity", zap.String("name", sName), zap.Error(err))
continue
}

st, err := storageForContainer(rootPath, cID)
if err != nil {
l.Warn("skip container storage that cannot be read", zap.String("name", sName), zap.Error(err))
continue
}

storagesRead[cID] = st
}

storages := storagesRead
defer func() {
if err != nil {
for _, st := range storages {
_ = st.db.Close()
}
}
}()

cnrsNetwork, err := cLister.List()
if err != nil {
return nil, fmt.Errorf("listing node's containers: %w", err)
}
for cID := range storagesRead {
if _, ok := cnrsNetwork[cID]; !ok {
err = storagesRead[cID].drop()
if err != nil {
l.Warn("could not drop container storage", zap.Stringer("cID", cID), zap.Error(err))
}

delete(storagesRead, cID)
}
}

for cID := range cnrsNetwork {
if _, ok := storages[cID]; !ok {
storages[cID], err = storageForContainer(rootPath, cID)
if err != nil {
return nil, fmt.Errorf("open container storage %s: %w", cID, err)
}
}
}

return &Meta{
l: l,
rootPath: rootPath,
netmapH: nmH,
cnrH: containerH,
cLister: cLister,
endpoints: endpoints,
timeout: timeout,
bCh: make(chan *block.Header),
objEv: make(chan *state.ContainedNotificationEvent),
cnrDelEv: make(chan *state.ContainedNotificationEvent),
cnrPutEv: make(chan *state.ContainedNotificationEvent),
epochEv: make(chan *state.ContainedNotificationEvent),
objNotificationBuff: make(chan *state.ContainedNotificationEvent, objsBufferSize),
storages: storages}, nil
}

// Run starts notification handling. Must be called only on instances created
// with [New]. Blocked until context is done.
func (m *Meta) Run(ctx context.Context) error {
defer func() {
m.m.Lock()
for _, st := range m.storages {
st.m.Lock()
_ = st.db.Close()
st.m.Unlock()
}
maps.Clear(m.storages)

m.m.Unlock()
}()

m.cliCtx = ctx

var err error
m.ws, err = m.connect()
if err != nil {
return fmt.Errorf("connect to NEO RPC: %w", err)
}
defer m.ws.Close()

v, err := m.ws.GetVersion()
if err != nil {
return fmt.Errorf("get version: %w", err)
}
m.magicNumber = uint32(v.Protocol.Network)

err = m.subscribeForMeta()
if err != nil {
return fmt.Errorf("subscribe for meta notifications: %w", err)
}

go m.flusher(ctx)
go m.objNotificationWorker(ctx, m.objNotificationBuff)

return m.listenNotifications(ctx)
}

func (m *Meta) flusher(ctx context.Context) {
const flushInterval = time.Second

t := time.NewTicker(flushInterval)
defer t.Stop()

for {
select {
case <-t.C:
m.m.RLock()

var wg errgroup.Group
wg.SetLimit(1024)

for _, st := range m.storages {
wg.Go(func() error {
st.m.Lock()
defer st.m.Unlock()

st.mpt.Collapse(collapseDepth)

_, err := st.mpt.Store.PersistSync()
if err != nil {
return fmt.Errorf("persisting %q storage: %w", st.path, err)
}

return nil
})
}

err := wg.Wait()

m.m.RUnlock()

if err != nil {
m.l.Error("storage flusher failed", zap.Error(err))
continue
}

t.Reset(flushInterval)
case <-ctx.Done():
return
}
}
}
Loading

0 comments on commit 1f15dae

Please sign in to comment.