From 1e994d2599025e6a0b611ccc34ec24bffc058b5f Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Thu, 30 Jan 2025 20:47:23 +0300 Subject: [PATCH] node/meta: initial MPT meta storage Information from FS chain is placed inside MPT structures reused from neo-go. Only the latest state is stored. Every container has its own MPT storage based on a separate bolt KV database. Refs #3070. Signed-off-by: Pavel Karpy --- go.mod | 2 +- pkg/services/meta/container.go | 126 +++++ pkg/services/meta/meta.go | 223 +++++++++ pkg/services/meta/notifications.go | 588 ++++++++++++++++++++++++ pkg/services/meta/notifications_test.go | 294 ++++++++++++ 5 files changed, 1232 insertions(+), 1 deletion(-) create mode 100644 pkg/services/meta/container.go create mode 100644 pkg/services/meta/meta.go create mode 100644 pkg/services/meta/notifications.go create mode 100644 pkg/services/meta/notifications_test.go diff --git a/go.mod b/go.mod index d98eed9bd7..f2ef012998 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( github.com/stretchr/testify v1.9.0 go.etcd.io/bbolt v1.3.11 go.uber.org/zap v1.27.0 + golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 golang.org/x/net v0.28.0 golang.org/x/sync v0.10.0 golang.org/x/sys v0.28.0 @@ -93,7 +94,6 @@ require ( github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.31.0 // indirect - golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 // indirect golang.org/x/text v0.21.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/pkg/services/meta/container.go b/pkg/services/meta/container.go new file mode 100644 index 0000000000..b5695b26a9 --- /dev/null +++ b/pkg/services/meta/container.go @@ -0,0 +1,126 @@ +package meta + +import ( + "errors" + "fmt" + "maps" + "os" + "path" + "sync" + + "github.com/nspcc-dev/neo-go/pkg/core/mpt" + "github.com/nspcc-dev/neo-go/pkg/core/storage" + "github.com/nspcc-dev/neo-go/pkg/core/storage/dbconfig" + "github.com/nspcc-dev/neo-go/pkg/util" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + objectsdk "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" +) + +type containerStorage struct { + m sync.RWMutex + opsBatch map[string][]byte + + path string + mpt *mpt.Trie + db storage.Store +} + +func (s *containerStorage) drop() error { + s.m.Lock() + defer s.m.Unlock() + + err := s.db.Close() + if err != nil { + return fmt.Errorf("close container storage: %w", err) + } + + err = os.RemoveAll(s.path) + if err != nil { + return fmt.Errorf("remove container storage: %w", err) + } + + return nil +} + +func (s *containerStorage) putObject(e objEvent) error { + s.m.Lock() + defer s.m.Unlock() + + newKVs := make(map[string][]byte) + commsuffix := e.oID[:] + + // batching that is implemented for MPT ignores key's first byte + + newKVs[string(append([]byte{0, oidIndex}, commsuffix...))] = []byte{} + newKVs[string(append([]byte{0, sizeIndex}, commsuffix...))] = e.size.Bytes() + if len(e.firstObject) == oid.Size { + newKVs[string(append([]byte{0, firstPartIndex}, commsuffix...))] = e.firstObject + } + if len(e.prevObject) == oid.Size { + newKVs[string(append([]byte{0, previousPartIndex}, commsuffix...))] = e.prevObject + } + if len(e.deletedObjects) > 0 { + newKVs[string(append([]byte{0, deletedIndex}, commsuffix...))] = e.deletedObjects + } + if len(e.lockedObjects) > 0 { + newKVs[string(append([]byte{0, lockedIndex}, commsuffix...))] = e.lockedObjects + } + if e.typ != objectsdk.TypeRegular { + newKVs[string(append([]byte{0, typeIndex}, commsuffix...))] = []byte{byte(e.typ)} + } + + if s.opsBatch == nil { + s.opsBatch = make(map[string][]byte) + } + maps.Copy(s.opsBatch, newKVs) + + err := s.db.PutChangeSet(mptToStoreBatch(newKVs), nil) + if err != nil { + return fmt.Errorf("put MPT KVs to the raw storage manually: %w", err) + } + + // TODO: receive full object header and store its non-MPT parts in + // persistent object storage + + return nil +} + +// mptToStoreBatch drops the first byte from every key in the map. +func mptToStoreBatch(b map[string][]byte) map[string][]byte { + res := make(map[string][]byte, len(b)) + for k, v := range b { + res[k[1:]] = v + } + + return res +} + +func storageForContainer(rootPath string, cID cid.ID) (*containerStorage, error) { + p := path.Join(rootPath, cID.EncodeToString()) + + st, err := storage.NewBoltDBStore(dbconfig.BoltDBOptions{FilePath: p, ReadOnly: false}) + if err != nil { + return nil, fmt.Errorf("open bolt store at %q: %w", p, err) + } + + var prevRootNode mpt.Node + root, err := st.Get([]byte{rootKey}) + if !errors.Is(err, storage.ErrKeyNotFound) { + if err != nil { + return nil, fmt.Errorf("get state root from db: %w", err) + } + + if len(root) != util.Uint256Size { + return nil, fmt.Errorf("root hash from db is %d bytes long, expect %d", len(root), util.Uint256Size) + } + + prevRootNode = mpt.NewHashNode([util.Uint256Size]byte(root)) + } + + return &containerStorage{ + path: p, + mpt: mpt.NewTrie(prevRootNode, mpt.ModeLatest, storage.NewMemCachedStore(st)), + db: st, + }, nil +} diff --git a/pkg/services/meta/meta.go b/pkg/services/meta/meta.go new file mode 100644 index 0000000000..4732beb306 --- /dev/null +++ b/pkg/services/meta/meta.go @@ -0,0 +1,223 @@ +package meta + +import ( + "context" + "fmt" + "os" + "sync" + "time" + + "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/rpcclient" + "github.com/nspcc-dev/neo-go/pkg/util" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "go.uber.org/zap" + "golang.org/x/exp/maps" + "golang.org/x/sync/errgroup" +) + +const ( + // raw storage prefixes. + + // rootKey is the key for the last known state root in KV data base + // associated with MPT. + rootKey = 0x00 +) + +// ContainerLister is a source of actual containers current node belongs to. +type ContainerLister interface { + // List returns node's containers that support chain-based meta data and + // any error that does not allow listing. + List() (map[cid.ID]struct{}, error) +} + +// Meta handles object meta information received from FS chain and object +// storages. Chain information is stored in Merkle-Patricia Tries. Full objects +// index is built and stored as a simple KV storage. +type Meta struct { + l *zap.Logger + rootPath string + netmapH util.Uint160 + cnrH util.Uint160 + cLister ContainerLister + + m sync.RWMutex + storages map[cid.ID]*containerStorage + + endpoints []string + timeout time.Duration + magicNumber uint32 + cliCtx context.Context // for client context only, as it is required by the lib + ws *rpcclient.WSClient + bCh chan *block.Header + objEv chan *state.ContainedNotificationEvent + cnrDelEv chan *state.ContainedNotificationEvent + cnrPutEv chan *state.ContainedNotificationEvent + epochEv chan *state.ContainedNotificationEvent + + objNotificationBuff chan *state.ContainedNotificationEvent +} + +const objsBufferSize = 1024 + +// New makes [Meta]. +func New(l *zap.Logger, cLister ContainerLister, timeout time.Duration, endpoints []string, containerH, nmH util.Uint160, rootPath string) (*Meta, error) { + storagesFS, err := os.ReadDir(rootPath) + if err != nil && !os.IsNotExist(err) { + return nil, fmt.Errorf("read existing container storages: %w", err) + } + storagesRead := make(map[cid.ID]*containerStorage) + for _, s := range storagesFS { + sName := s.Name() + cID, err := cid.DecodeString(sName) + if err != nil { + l.Warn("skip unknown container storage entity", zap.String("name", sName), zap.Error(err)) + continue + } + + st, err := storageForContainer(rootPath, cID) + if err != nil { + l.Warn("skip container storage that cannot be read", zap.String("name", sName), zap.Error(err)) + continue + } + + storagesRead[cID] = st + } + + storages := storagesRead + defer func() { + if err != nil { + for _, st := range storages { + _ = st.db.Close() + } + } + }() + + cnrsNetwork, err := cLister.List() + if err != nil { + return nil, fmt.Errorf("listing node's containers: %w", err) + } + for cID := range storagesRead { + if _, ok := cnrsNetwork[cID]; !ok { + err = storagesRead[cID].drop() + if err != nil { + l.Warn("could not drop container storage", zap.Stringer("cID", cID), zap.Error(err)) + } + + delete(storagesRead, cID) + } + } + + for cID := range cnrsNetwork { + if _, ok := storages[cID]; !ok { + storages[cID], err = storageForContainer(rootPath, cID) + if err != nil { + return nil, fmt.Errorf("open container storage %s: %w", cID, err) + } + } + } + + return &Meta{ + l: l, + rootPath: rootPath, + netmapH: nmH, + cnrH: containerH, + cLister: cLister, + endpoints: endpoints, + timeout: timeout, + bCh: make(chan *block.Header), + objEv: make(chan *state.ContainedNotificationEvent), + cnrDelEv: make(chan *state.ContainedNotificationEvent), + cnrPutEv: make(chan *state.ContainedNotificationEvent), + epochEv: make(chan *state.ContainedNotificationEvent), + objNotificationBuff: make(chan *state.ContainedNotificationEvent, objsBufferSize), + storages: storages}, nil +} + +// Run starts notification handling. Must be called only on instances created +// with [New]. Blocked until context is done. +func (m *Meta) Run(ctx context.Context) error { + defer func() { + m.m.Lock() + for _, st := range m.storages { + st.m.Lock() + _ = st.db.Close() + st.m.Unlock() + } + maps.Clear(m.storages) + + m.m.Unlock() + }() + + m.cliCtx = ctx + + var err error + m.ws, err = m.connect() + if err != nil { + return fmt.Errorf("connect to NEO RPC: %w", err) + } + defer m.ws.Close() + + v, err := m.ws.GetVersion() + if err != nil { + return fmt.Errorf("get version: %w", err) + } + m.magicNumber = uint32(v.Protocol.Network) + + err = m.subscribeForMeta() + if err != nil { + return fmt.Errorf("subscribe for meta notifications: %w", err) + } + + go m.flusher(ctx) + go m.objNotificationWorker(ctx, m.objNotificationBuff) + + return m.listenNotifications(ctx) +} + +func (m *Meta) flusher(ctx context.Context) { + const flushInterval = time.Second + + t := time.NewTicker(flushInterval) + defer t.Stop() + + for { + select { + case <-t.C: + m.m.RLock() + + var wg errgroup.Group + wg.SetLimit(1024) + + for _, st := range m.storages { + wg.Go(func() error { + st.m.Lock() + defer st.m.Unlock() + + st.mpt.Collapse(collapseDepth) + + _, err := st.mpt.Store.PersistSync() + if err != nil { + return fmt.Errorf("persisting %q storage: %w", st.path, err) + } + + return nil + }) + } + + err := wg.Wait() + + m.m.RUnlock() + + if err != nil { + m.l.Error("storage flusher failed", zap.Error(err)) + continue + } + + t.Reset(flushInterval) + case <-ctx.Done(): + return + } + } +} diff --git a/pkg/services/meta/notifications.go b/pkg/services/meta/notifications.go new file mode 100644 index 0000000000..d07e6c643a --- /dev/null +++ b/pkg/services/meta/notifications.go @@ -0,0 +1,588 @@ +package meta + +import ( + "context" + "fmt" + "math/big" + + "github.com/nspcc-dev/neo-go/pkg/core/mpt" + "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/neorpc" + "github.com/nspcc-dev/neo-go/pkg/rpcclient" + "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + objectsdk "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.uber.org/zap" +) + +const ( + objPutEvName = "ObjectPut" + cnrDeleteName = "DeleteSuccess" + cnrPutName = "PutSuccess" + newEpochName = "NewEpoch" +) + +func (m *Meta) subscribeForMeta() error { + _, err := m.ws.ReceiveHeadersOfAddedBlocks(nil, m.bCh) + if err != nil { + return fmt.Errorf("subscribe for block headers: %w", err) + } + + objEv := objPutEvName + _, err = m.ws.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &m.cnrH, Name: &objEv}, m.objEv) + if err != nil { + return fmt.Errorf("subscribe for object notifications: %w", err) + } + + cnrDeleteEv := cnrDeleteName + _, err = m.ws.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &m.cnrH, Name: &cnrDeleteEv}, m.cnrDelEv) + if err != nil { + return fmt.Errorf("subscribe for container removal notifications: %w", err) + } + + cnrPutEv := cnrPutName + _, err = m.ws.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &m.cnrH, Name: &cnrPutEv}, m.cnrPutEv) + if err != nil { + return fmt.Errorf("subscribe for container addition notifications: %w", err) + } + + epochEv := newEpochName + _, err = m.ws.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &m.netmapH, Name: &epochEv}, m.epochEv) + if err != nil { + return fmt.Errorf("subscribe for epoch notifications: %w", err) + } + + return nil +} + +func (m *Meta) listenNotifications(ctx context.Context) error { + for { + select { + case h, ok := <-m.bCh: + if !ok { + err := m.reconnect() + if err != nil { + return err + } + + continue + } + + go func() { + err := m.handleBlock(h.Index) + if err != nil { + m.l.Error(fmt.Sprintf("processing %d block", h.Index), zap.Error(err)) + return + } + }() + case aer, ok := <-m.objEv: + if !ok { + err := m.reconnect() + if err != nil { + return err + } + + continue + } + + // TODO: https://github.com/nspcc-dev/neo-go/issues/3779 receive somehow notifications from blocks + + m.objNotificationBuff <- aer + case aer, ok := <-m.cnrDelEv: + if !ok { + err := m.reconnect() + if err != nil { + return err + } + + continue + } + + l := m.l.With(zap.Stringer("notification container", aer.Container)) + + ev, err := parseCnrNotification(aer) + if err != nil { + l.Error("invalid container notification received", zap.Error(err)) + continue + } + + m.m.RLock() + _, ok = m.storages[ev.cID] + m.m.RUnlock() + if !ok { + l.Debug("skipping container notification", zap.Stringer("inactual container", ev.cID)) + continue + } + + go func() { + err = m.dropContainer(ev.cID) + if err != nil { + l.Error("deleting container failed", zap.Error(err)) + return + } + + l.Debug("deleted container", zap.Stringer("cID", ev.cID)) + }() + case aer, ok := <-m.cnrPutEv: + if !ok { + err := m.reconnect() + if err != nil { + return err + } + + continue + } + + l := m.l.With(zap.Stringer("notification container", aer.Container)) + + ev, err := parseCnrNotification(aer) + if err != nil { + l.Error("invalid container notification received", zap.Error(err)) + continue + } + + m.m.Lock() + + m.storages[ev.cID], err = storageForContainer(m.rootPath, ev.cID) + if err != nil { + m.m.Unlock() + return fmt.Errorf("open new storage for %s container: %w", ev.cID, err) + } + + m.m.Unlock() + + l.Debug("added container storage", zap.Stringer("cID", ev.cID)) + case aer, ok := <-m.epochEv: + if !ok { + err := m.reconnect() + if err != nil { + return err + } + + continue + } + + l := m.l.With(zap.Stringer("notification container", aer.Container)) + + epoch, err := parseEpochNotification(aer) + if err != nil { + l.Error("invalid new epoch notification received", zap.Error(err)) + continue + } + + go func() { + err = m.handleEpochNotification(epoch) + if err != nil { + l.Error("handling new epoch notification", zap.Int64("epoch", epoch), zap.Error(err)) + return + } + }() + case <-ctx.Done(): + m.l.Info("stop listening meta notifications") + return nil + } + } +} + +func (m *Meta) reconnect() error { + m.l.Warn("reconnecting to web socket client due to connection lost") + + var err error + m.ws, err = m.connect() + if err != nil { + return fmt.Errorf("reconnecting to web socket: %w", err) + } + + return nil +} + +func (m *Meta) connect() (*rpcclient.WSClient, error) { + var cli *rpcclient.WSClient + var err error + for _, e := range m.endpoints { + cli, err = rpcclient.NewWS(m.cliCtx, e, rpcclient.WSOptions{ + Options: rpcclient.Options{ + DialTimeout: m.timeout, + }, + }) + if err == nil { + break + } + + m.l.Warn("creating rpc client", zap.String("endpoint", e), zap.Error(err)) + } + if err != nil { + return nil, fmt.Errorf("could not create web socket client, last error: %w", err) + } + + err = cli.Init() + if err != nil { + return nil, fmt.Errorf("web socket client initializing: %w", err) + } + + return cli, nil +} + +const ( + collapseDepth = 10 +) + +func (m *Meta) handleBlock(ind uint32) error { + l := m.l.With(zap.Uint32("block", ind)) + l.Debug("handling block") + + m.m.RLock() + defer m.m.RUnlock() + + for _, st := range m.storages { + // TODO: parallelize depending on what can parallelize well + + st.m.Lock() + + root := st.mpt.StateRoot() + st.mpt.Store.Put([]byte{rootKey}, root[:]) + p := st.path + if st.opsBatch != nil { + _, err := st.mpt.PutBatch(mpt.MapToMPTBatch(st.opsBatch)) + if err != nil { + st.m.Unlock() + return fmt.Errorf("put batch for %d block to %q storage: %w", ind, p, err) + } + + st.opsBatch = nil + } + + st.m.Unlock() + + st.mpt.Flush(ind) + } + + // TODO drop containers that node does not belong to anymore? + + l.Debug("handled block successfully") + + return nil +} + +func (m *Meta) objNotificationWorker(ctx context.Context, ch <-chan *state.ContainedNotificationEvent) { + for { + select { + case <-ctx.Done(): + return + case n := <-ch: + l := m.l.With(zap.Stringer("notification container", n.Container)) + + ev, err := parseObjNotification(n) + if err != nil { + l.Error("invalid object notification received", zap.Error(err)) + continue + } + + m.m.RLock() + _, ok := m.storages[ev.cID] + m.m.RUnlock() + if !ok { + l.Debug("skipping object notification", zap.Stringer("inactual container", ev.cID)) + continue + } + + err = m.handleObjectNotification(ev) + if err != nil { + l.Error("handling object notification", zap.Error(err)) + return + } + + l.Debug("handled object notification successfully", zap.Stringer("cID", ev.cID), zap.Stringer("oID", ev.oID)) + } + } +} + +const ( + // MPT key prefixes. + oidIndex = iota + sizeIndex + firstPartIndex + previousPartIndex + deletedIndex + lockedIndex + typeIndex +) + +const ( + // meta map keys from FS chain. + cidKey = "cid" + oidKey = "oid" + sizeKey = "size" + validUntilKey = "validUntil" + networkMagicKey = "network" + firstPartKey = "firstPart" + previousPartKey = "previousPart" + deletedKey = "deleted" + lockedKey = "locked" + typeKey = "type" +) + +type objEvent struct { + cID cid.ID + oID oid.ID + size *big.Int + network *big.Int + firstObject []byte + prevObject []byte + deletedObjects []byte + lockedObjects []byte + typ objectsdk.Type +} + +func parseObjNotification(ev *state.ContainedNotificationEvent) (objEvent, error) { + const expectedNotificationArgs = 3 + var res objEvent + + arr, ok := ev.Item.Value().([]stackitem.Item) + if !ok { + return res, fmt.Errorf("unexpected notification stack item: %T", ev.Item.Value()) + } + if len(arr) != expectedNotificationArgs { + return res, fmt.Errorf("unexpected number of items on stack: %d, expected: %d", len(arr), expectedNotificationArgs) + } + + cID, ok := arr[0].Value().([]byte) + if !ok { + return res, fmt.Errorf("unexpected container ID stack item: %T", arr[0].Value()) + } + oID, ok := arr[1].Value().([]byte) + if !ok { + return res, fmt.Errorf("unexpected object ID stack item: %T", arr[1].Value()) + } + meta, ok := arr[2].(*stackitem.Map) + if !ok { + return res, fmt.Errorf("unexpected meta stack item: %T", arr[2]) + } + + if len(cID) != cid.Size { + return res, fmt.Errorf("unexpected container ID len: %d", len(cID)) + } + if len(oID) != oid.Size { + return res, fmt.Errorf("unexpected object ID len: %d", len(oID)) + } + + res.cID = cid.ID(cID) + res.oID = oid.ID(oID) + + v := getFromMap(meta, sizeKey) + if v == nil { + return res, fmt.Errorf("missing '%s' key", sizeKey) + } + res.size, ok = v.Value().(*big.Int) + if !ok { + return res, fmt.Errorf("unexpected object size type: %T", v.Value()) + } + + v = getFromMap(meta, networkMagicKey) + if v == nil { + return res, fmt.Errorf("missing '%s' key", networkMagicKey) + } + res.network, ok = v.Value().(*big.Int) + if !ok { + return res, fmt.Errorf("unexpected network type: %T", v.Value()) + } + + v = getFromMap(meta, firstPartKey) + if v != nil { + res.firstObject, ok = v.Value().([]byte) + if !ok { + return res, fmt.Errorf("unexpected first part type: %T", v.Value()) + } + } + + v = getFromMap(meta, previousPartKey) + if v != nil { + res.prevObject, ok = v.Value().([]byte) + if !ok { + return res, fmt.Errorf("unexpected previous part type: %T", v.Value()) + } + } + + v = getFromMap(meta, typeKey) + if v != nil { + typ, ok := v.Value().(*big.Int) + if !ok { + return res, fmt.Errorf("unexpected object type field: %T", v.Value()) + } + res.typ = objectsdk.Type(typ.Uint64()) + + switch res.typ { + case objectsdk.TypeTombstone: + v = getFromMap(meta, deletedKey) + if v == nil { + return res, fmt.Errorf("missing '%s' key for %s object type", deletedKey, res.typ) + } + stackDeleted := v.Value().([]stackitem.Item) + for i, d := range stackDeleted { + rawDeleted, ok := d.Value().([]byte) + if !ok { + return res, fmt.Errorf("unexpected %d deleted object type: %T", i, d.Value()) + } + res.deletedObjects = append(res.deletedObjects, rawDeleted...) + } + case objectsdk.TypeLock: + v = getFromMap(meta, lockedKey) + if v == nil { + return res, fmt.Errorf("missing '%s' key for %s object type", lockedKey, res.typ) + } + stackLocked := v.Value().([]stackitem.Item) + for i, d := range stackLocked { + rawLocked, ok := d.Value().([]byte) + if !ok { + return res, fmt.Errorf("unexpected %d locked object type: %T", i, d.Value()) + } + res.lockedObjects = append(res.deletedObjects, rawLocked...) + } + case objectsdk.TypeLink, objectsdk.TypeRegular: + default: + return res, fmt.Errorf("unknown '%s' object type", res.typ) + } + } + + return res, nil +} + +func getFromMap(m *stackitem.Map, key string) stackitem.Item { + i := m.Index(stackitem.Make(key)) + if i < 0 { + return nil + } + + return m.Value().([]stackitem.MapElement)[i].Value +} + +func (m *Meta) handleObjectNotification(e objEvent) error { + if magic := uint32(e.network.Uint64()); magic != m.magicNumber { + return fmt.Errorf("wrong magic number %d, expected: %d", magic, m.magicNumber) + } + + m.m.RLock() + defer m.m.RUnlock() + + err := m.storages[e.cID].putObject(e) + if err != nil { + return err + } + + return nil +} + +type cnrEvent struct { + cID cid.ID +} + +func parseCnrNotification(ev *state.ContainedNotificationEvent) (cnrEvent, error) { + var res cnrEvent + + arr, ok := ev.Item.Value().([]stackitem.Item) + if !ok { + return res, fmt.Errorf("unexpected notification stack item: %T", ev.Item.Value()) + } + + switch ev.Name { + case cnrDeleteName: + const expectedNotificationArgs = 1 + if len(arr) != expectedNotificationArgs { + return res, fmt.Errorf("unexpected number of items on stack: %d, expected: %d", len(arr), expectedNotificationArgs) + } + case cnrPutName: + const expectedNotificationArgs = 2 + if len(arr) != expectedNotificationArgs { + return res, fmt.Errorf("unexpected number of items on stack: %d, expected: %d", len(arr), expectedNotificationArgs) + } + } + + cID, ok := arr[0].Value().([]byte) + if !ok { + return res, fmt.Errorf("unexpected container ID stack item: %T", arr[0].Value()) + } + if len(cID) != cid.Size { + return res, fmt.Errorf("unexpected container ID len: %d", len(cID)) + } + + return cnrEvent{cID: cid.ID(cID)}, nil +} + +func (m *Meta) dropContainer(cID cid.ID) error { + m.m.Lock() + defer m.m.Unlock() + + st, ok := m.storages[cID] + if !ok { + return nil + } + + err := st.drop() + if err != nil { + m.l.Warn("drop container %s: %w", zap.Stringer("cID", cID), zap.Error(err)) + } + + delete(m.storages, cID) + + return nil +} + +func parseEpochNotification(ev *state.ContainedNotificationEvent) (int64, error) { + const expectedNotificationArgs = 1 + + arr, ok := ev.Item.Value().([]stackitem.Item) + if !ok { + return 0, fmt.Errorf("unexpected notification stack item: %T", ev.Item.Value()) + } + if len(arr) != expectedNotificationArgs { + return 0, fmt.Errorf("unexpected number of items on stack: %d, expected: %d", len(arr), expectedNotificationArgs) + } + + epoch, ok := arr[0].Value().(*big.Int) + if !ok { + return 0, fmt.Errorf("unexpected epoch stack item: %T", arr[0].Value()) + } + + return epoch.Int64(), nil +} + +func (m *Meta) handleEpochNotification(e int64) error { + m.l.Debug("handling new epoch notification", zap.Int64("epoch", e)) + + cnrsNetwork, err := m.cLister.List() + if err != nil { + return fmt.Errorf("list containers: %w", err) + } + + m.m.Lock() + defer m.m.Unlock() + + for cID, st := range m.storages { + _, ok := cnrsNetwork[cID] + if !ok { + err = st.drop() + if err != nil { + m.l.Warn("drop inactual container", zap.Int64("epoch", e), zap.Stringer("cID", cID), zap.Error(err)) + } + + delete(m.storages, cID) + } + } + for cID := range cnrsNetwork { + if _, ok := m.storages[cID]; ok { + continue + } + + st, err := storageForContainer(m.rootPath, cID) + if err != nil { + return fmt.Errorf("create storage for container %s: %w", cID, err) + } + + m.storages[cID] = st + } + + m.l.Debug("handled new epoch successfully", zap.Int64("epoch", e)) + + return nil +} diff --git a/pkg/services/meta/notifications_test.go b/pkg/services/meta/notifications_test.go new file mode 100644 index 0000000000..454a4b9fa6 --- /dev/null +++ b/pkg/services/meta/notifications_test.go @@ -0,0 +1,294 @@ +package meta + +import ( + "context" + "crypto/rand" + "errors" + "maps" + "math/big" + "os" + "testing" + "time" + + "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/core/mpt" + "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/core/storage" + "github.com/nspcc-dev/neo-go/pkg/util" + "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + utilcore "github.com/nspcc-dev/neofs-node/pkg/util" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + objectsdk "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" +) + +type testContainerLister struct { + res map[cid.ID]struct{} + resErr error +} + +func (t *testContainerLister) List() (map[cid.ID]struct{}, error) { + return t.res, t.resErr +} + +func testContainers(t *testing.T, num int) []cid.ID { + res := make([]cid.ID, num) + for i := range num { + _, err := rand.Read(res[i][:]) + require.NoError(t, err) + } + + return res +} + +func newEpoch(m *Meta, cnrs map[cid.ID]struct{}, epoch int) { + m.cLister = &testContainerLister{res: cnrs, resErr: nil} + + m.epochEv <- &state.ContainedNotificationEvent{ + NotificationEvent: state.NotificationEvent{ + Name: newEpochName, + Item: stackitem.NewArray([]stackitem.Item{stackitem.Make(epoch)}), + }, + } +} + +func checkDBFiles(t *testing.T, path string, cnrs map[cid.ID]struct{}) { + require.Eventually(t, func() bool { + entries, err := os.ReadDir(path) + if err != nil { + return false + } + + cnrsCopy := maps.Clone(cnrs) + if len(entries) != len(cnrsCopy) { + return false + } + for _, e := range entries { + var cID cid.ID + err = cID.DecodeString(e.Name()) + if err != nil { + t.Fatal("unexpected db file name", e.Name()) + } + + if _, ok := cnrsCopy[cID]; !ok { + return false + } + + delete(cnrsCopy, cID) + } + + return true + }, 5*time.Second, time.Millisecond*100, "expected to find db files") +} + +func createAndRunTestMeta(t *testing.T) (*Meta, func(), chan struct{}) { + ctx, cancel := context.WithCancel(context.Background()) + m := &Meta{ + l: zaptest.NewLogger(t), + rootPath: t.TempDir(), + magicNumber: 102938475, + bCh: make(chan *block.Header), + objEv: make(chan *state.ContainedNotificationEvent), + cnrDelEv: make(chan *state.ContainedNotificationEvent), + cnrPutEv: make(chan *state.ContainedNotificationEvent), + epochEv: make(chan *state.ContainedNotificationEvent), + objNotificationBuff: make(chan *state.ContainedNotificationEvent, objsBufferSize), + + // no-op, to be filled by test cases if needed + storages: make(map[cid.ID]*containerStorage), + netmapH: util.Uint160{}, + cnrH: util.Uint160{}, + cLister: &testContainerLister{}, + endpoints: []string{}, + timeout: time.Second, + } + + exitCh := make(chan struct{}) + + go m.flusher(ctx) + go m.objNotificationWorker(ctx, m.objNotificationBuff) + go func() { + _ = m.listenNotifications(ctx) + exitCh <- struct{}{} + }() + + return m, cancel, exitCh +} + +// args list consists of [testing.T], [Meta] and the list from +// [object.EncodeReplicationMetaInfo] can be improved at the time [object] +// package will do it. +func checkObject(t *testing.T, m *Meta, cID cid.ID, oID, firstPart, previousPart oid.ID, pSize uint64, typ objectsdk.Type, deleted, locked []oid.ID, _ uint64, _ uint32) bool { + get := func(trie *mpt.Trie, st storage.Store, key, expV []byte) bool { + mptV, err := trie.Get(key) + if err != nil { + if errors.Is(err, mpt.ErrNotFound) { + return false + } + t.Fatalf("failed to get oid value from mpt: %v", err) + } + + dbV, err := st.Get(key) + if err != nil { + if errors.Is(err, storage.ErrKeyNotFound) { + return false + } + t.Fatalf("failed to get oid value from db: %v", err) + } + + require.Equal(t, dbV, mptV) + require.Equal(t, dbV, expV) + + return true + } + + m.m.RLock() + st := m.storages[cID] + m.m.RUnlock() + + st.m.RLock() + defer st.m.RUnlock() + + commSuffix := oID[:] + + ok := get(st.mpt, st.db, append([]byte{oidIndex}, commSuffix...), []byte{}) + if !ok { + return false + } + + var sizeB big.Int + sizeB.SetUint64(pSize) + ok = get(st.mpt, st.db, append([]byte{sizeIndex}, commSuffix...), sizeB.Bytes()) + if !ok { + return false + } + + if firstPart != (oid.ID{}) { + ok = get(st.mpt, st.db, append([]byte{firstPartIndex}, commSuffix...), firstPart[:]) + if !ok { + return false + } + } + + if previousPart != (oid.ID{}) { + ok = get(st.mpt, st.db, append([]byte{previousPartIndex}, commSuffix...), previousPart[:]) + if !ok { + return false + } + } + + if len(deleted) != 0 { + expVal := make([]byte, 0, oid.Size*(len(deleted))) + for _, d := range deleted { + expVal = append(expVal, d[:]...) + } + + ok = get(st.mpt, st.db, append([]byte{deletedIndex}, commSuffix...), expVal) + if !ok { + return false + } + } + + if len(locked) != 0 { + expVal := make([]byte, 0, oid.Size*(len(locked))) + for _, l := range locked { + expVal = append(expVal, l[:]...) + } + + ok = get(st.mpt, st.db, append([]byte{lockedIndex}, commSuffix...), expVal) + if !ok { + return false + } + } + + if typ != objectsdk.TypeRegular { + ok = get(st.mpt, st.db, append([]byte{typeIndex}, commSuffix...), []byte{byte(typ)}) + if !ok { + return false + } + } + + return true +} + +func TestObjectPut(t *testing.T) { + m, stop, exitCh := createAndRunTestMeta(t) + t.Cleanup(func() { + stop() + <-exitCh + }) + + testCnrs := testContainers(t, 10) + mTestCnrs := utilcore.SliceToMap(testCnrs) + + var epoch int + newEpoch(m, mTestCnrs, epoch) + + t.Run("storages for containers", func(t *testing.T) { + checkDBFiles(t, m.rootPath, mTestCnrs) + }) + + t.Run("drop storage", func(t *testing.T) { + newContainers := utilcore.SliceToMap(testCnrs[1:]) + + epoch++ + newEpoch(m, newContainers, epoch) + checkDBFiles(t, m.rootPath, newContainers) + }) + + t.Run("add storage", func(t *testing.T) { + // add just dropped storage back + epoch++ + newEpoch(m, mTestCnrs, epoch) + checkDBFiles(t, m.rootPath, mTestCnrs) + }) + + t.Run("object put", func(t *testing.T) { + cID := testCnrs[0] + oID := oidtest.ID() + fPart := oidtest.ID() + pPart := oidtest.ID() + size := uint64(123) + typ := objectsdk.TypeTombstone + deleted := []oid.ID{oidtest.ID(), oidtest.ID(), oidtest.ID()} + + metaRaw := object.EncodeReplicationMetaInfo(cID, oID, fPart, pPart, size, typ, deleted, nil, 12345, m.magicNumber) + metaStack, err := stackitem.Deserialize(metaRaw) + require.NoError(t, err) + + stopTest := make(chan struct{}) + t.Cleanup(func() { + close(stopTest) + }) + go func() { + var i uint32 = 1 + tick := time.NewTicker(100 * time.Millisecond) + for { + select { + case <-tick.C: + m.bCh <- &block.Header{ + Index: i, + } + i++ + case <-stopTest: + return + } + } + }() + + m.objEv <- &state.ContainedNotificationEvent{ + NotificationEvent: state.NotificationEvent{ + Name: objPutEvName, + Item: stackitem.NewArray([]stackitem.Item{stackitem.Make(cID[:]), stackitem.Make(oID[:]), metaStack}), + }, + } + + require.Eventually(t, func() bool { + return checkObject(t, m, cID, oID, fPart, pPart, size, typ, deleted, nil, 12345, m.magicNumber) + }, 3*time.Second, time.Millisecond*100, "object was not handled properly") + }) +}