From a08075de717a0f6c1df3daeca005db6d8a1ae4ea Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Mon, 26 Feb 2024 08:43:10 +0400 Subject: [PATCH] object: Stop supporting object-level notifications They were marked as deprecated in the protocol in https://github.com/nspcc-dev/neofs-api/issues/279. NATS is no longer needed. Signed-off-by: Leonard Lyubich --- .github/testcases-env | 4 - CHANGELOG.md | 5 + cmd/neofs-cli/modules/object/put.go | 47 +----- cmd/neofs-node/config.go | 7 - cmd/neofs-node/config/node/config.go | 76 ---------- cmd/neofs-node/config/node/config_test.go | 29 ---- cmd/neofs-node/main.go | 2 - cmd/neofs-node/notificator.go | 167 ---------------------- cmd/neofs-node/object.go | 67 +-------- config/example/node.env | 7 - config/example/node.json | 9 -- config/example/node.yaml | 8 -- docs/storage-node-configuration.md | 22 --- go.mod | 3 - go.sum | 6 - pkg/services/notificator/deps.go | 20 --- pkg/services/notificator/nats/options.go | 38 ----- pkg/services/notificator/nats/service.go | 132 ----------------- pkg/services/notificator/service.go | 85 ----------- 19 files changed, 12 insertions(+), 722 deletions(-) delete mode 100644 cmd/neofs-node/notificator.go delete mode 100644 pkg/services/notificator/deps.go delete mode 100644 pkg/services/notificator/nats/options.go delete mode 100644 pkg/services/notificator/nats/service.go delete mode 100644 pkg/services/notificator/service.go diff --git a/.github/testcases-env b/.github/testcases-env index c8469835cb4..61fd19226cd 100644 --- a/.github/testcases-env +++ b/.github/testcases-env @@ -25,10 +25,6 @@ IR_IMAGE=nspccdev/neofs-ir NODE_VERSION=_TAG_ NODE_IMAGE=nspccdev/neofs-storage -# NATS Server -NATS_VERSION=2.7.2 -NATS_IMAGE=nats - # HTTP Gate HTTP_GW_VERSION=0.28.0 HTTP_GW_IMAGE=nspccdev/neofs-http-gw diff --git a/CHANGELOG.md b/CHANGELOG.md index 232d2cccf7c..4b49d778d7c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,10 +12,15 @@ Changelog for NeoFS Node ### Changed ### Removed +- Object notifications incl. NATS (#2750)) ### Updated ### Updating from v0.40.1 +Remove `notification` section from all SN configuration files: it is no longer +supported. All NATS servers running for this purpose only are no longer needed. +If your app depends on notifications transmitted to NATS, do not update and +create an issue please. ## [0.40.1] - 2024-02-22 diff --git a/cmd/neofs-cli/modules/object/put.go b/cmd/neofs-cli/modules/object/put.go index 8cffe392399..1b900eeba7d 100644 --- a/cmd/neofs-cli/modules/object/put.go +++ b/cmd/neofs-cli/modules/object/put.go @@ -22,8 +22,7 @@ import ( ) const ( - noProgressFlag = "no-progress" - notificationFlag = "notify" + noProgressFlag = "no-progress" ) var putExpiredOn uint64 @@ -55,7 +54,6 @@ func initObjectPutCmd() { flags.Uint64P(commonflags.Lifetime, "l", 0, "Number of epochs for object to stay valid") flags.Bool(noProgressFlag, false, "Do not show progress bar") - flags.String(notificationFlag, "", "Object notification in the form of *epoch*:*topic*; '-' topic means using default") flags.Bool(binaryFlag, false, "Deserialize object structure from given file.") objectPutCmd.MarkFlagsMutuallyExclusive(commonflags.ExpireAt, commonflags.Lifetime) } @@ -139,13 +137,6 @@ func putObject(cmd *cobra.Command, _ []string) { obj.SetOwnerID(&ownerID) obj.SetAttributes(attrs...) - notificationInfo, err := parseObjectNotifications(cmd) - common.ExitOnErr(cmd, "can't parse object notification information: %w", err) - - if notificationInfo != nil { - obj.SetNotification(*notificationInfo) - } - var prm internalclient.PutObjectPrm prm.SetPrivateKey(*pk) ReadOrOpenSession(ctx, cmd, &prm, pk, cnr, nil) @@ -224,39 +215,3 @@ func parseObjectAttrs(cmd *cobra.Command) ([]object.Attribute, error) { return attrs, nil } - -func parseObjectNotifications(cmd *cobra.Command) (*object.NotificationInfo, error) { - const ( - separator = ":" - useDefaultTopic = "-" - ) - - raw := cmd.Flag(notificationFlag).Value.String() - if raw == "" { - return nil, nil - } - - rawSlice := strings.SplitN(raw, separator, 2) - if len(rawSlice) != 2 { - return nil, fmt.Errorf("notification must be in the form of: *epoch*%s*topic*, got %s", separator, raw) - } - - ni := new(object.NotificationInfo) - - epoch, err := strconv.ParseUint(rawSlice[0], 10, 64) - if err != nil { - return nil, fmt.Errorf("could not parse notification epoch %s: %w", rawSlice[0], err) - } - - ni.SetEpoch(epoch) - - if rawSlice[1] == "" { - return nil, fmt.Errorf("incorrect empty topic: use %s to force using default topic", useDefaultTopic) - } - - if rawSlice[1] != useDefaultTopic { - ni.SetTopic(rawSlice[1]) - } - - return ni, nil -} diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 9b83d412df3..37d8ec46e12 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -399,7 +399,6 @@ type cfg struct { cfgControlService cfgControlService cfgReputation cfgReputation cfgObject cfgObject - cfgNotifications cfgNotifications } // ReadCurrentNetMap reads network map which has been cached at the @@ -475,12 +474,6 @@ type cfgObject struct { tombstoneLifetime uint64 } -type cfgNotifications struct { - enabled bool - nw notificationWriter - defaultTopic string -} - type cfgLocalStorage struct { localStorage *engine.StorageEngine } diff --git a/cmd/neofs-node/config/node/config.go b/cmd/neofs-node/config/node/config.go index d006d4443c0..0d62635385c 100644 --- a/cmd/neofs-node/config/node/config.go +++ b/cmd/neofs-node/config/node/config.go @@ -24,17 +24,10 @@ type PersistentStateConfig struct { cfg *config.Config } -// NotificationConfig is a wrapper over "notification" config section -// which provides access to object notification configuration of node. -type NotificationConfig struct { - cfg *config.Config -} - const ( subsection = "node" persistentSessionsSubsection = "persistent_sessions" persistentStateSubsection = "persistent_state" - notificationSubsection = "notification" attributePrefix = "attribute" @@ -176,72 +169,3 @@ func (p PersistentStateConfig) Path() string { return PersistentStatePathDefault } - -// Notification returns structure that provides access to "notification" -// subsection of "node" section. -func Notification(c *config.Config) NotificationConfig { - return NotificationConfig{ - c.Sub(subsection).Sub(notificationSubsection), - } -} - -// Enabled returns the value of "enabled" config parameter from "notification" -// subsection of "node" section. -// -// Returns false if the value is not presented. -func (n NotificationConfig) Enabled() bool { - return config.BoolSafe(n.cfg, "enabled") -} - -// DefaultTopic returns the value of "default_topic" config parameter from -// "notification" subsection of "node" section. -// -// Returns empty string if the value is not presented. -func (n NotificationConfig) DefaultTopic() string { - return config.StringSafe(n.cfg, "default_topic") -} - -// Endpoint returns the value of "endpoint" config parameter from "notification" -// subsection of "node" section. -// -// Returns empty string if the value is not presented. -func (n NotificationConfig) Endpoint() string { - return config.StringSafe(n.cfg, "endpoint") -} - -// Timeout returns the value of "timeout" config parameter from "notification" -// subsection of "node" section. -// -// Returns NotificationTimeoutDefault if the value is not positive. -func (n NotificationConfig) Timeout() time.Duration { - v := config.DurationSafe(n.cfg, "timeout") - if v > 0 { - return v - } - - return NotificationTimeoutDefault -} - -// CertPath returns the value of "certificate_path" config parameter from "notification" -// subsection of "node" section. -// -// Returns empty string if the value is not presented. -func (n NotificationConfig) CertPath() string { - return config.StringSafe(n.cfg, "certificate") -} - -// KeyPath returns the value of "key_path" config parameter from -// "notification" subsection of "node" section. -// -// Returns empty string if the value is not presented. -func (n NotificationConfig) KeyPath() string { - return config.StringSafe(n.cfg, "key") -} - -// CAPath returns the value of "ca_path" config parameter from -// "notification" subsection of "node" section. -// -// Returns empty string if the value is not presented. -func (n NotificationConfig) CAPath() string { - return config.StringSafe(n.cfg, "ca") -} diff --git a/cmd/neofs-node/config/node/config_test.go b/cmd/neofs-node/config/node/config_test.go index f3b1dc89fc9..f511b15a7b6 100644 --- a/cmd/neofs-node/config/node/config_test.go +++ b/cmd/neofs-node/config/node/config_test.go @@ -2,7 +2,6 @@ package nodeconfig import ( "testing" - "time" "github.com/nspcc-dev/neo-go/pkg/encoding/address" "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config" @@ -33,25 +32,11 @@ func TestNodeSection(t *testing.T) { relay := Relay(empty) persisessionsPath := PersistentSessions(empty).Path() persistatePath := PersistentState(empty).Path() - notificationDefaultEnabled := Notification(empty).Enabled() - notificationDefaultEndpoint := Notification(empty).Endpoint() - notificationDefaultTimeout := Notification(empty).Timeout() - notificationDefaultTopic := Notification(empty).DefaultTopic() - notificationDefaultCertPath := Notification(empty).CertPath() - notificationDefaultKeyPath := Notification(empty).KeyPath() - notificationDefaultCAPath := Notification(empty).CAPath() require.Empty(t, attribute) require.Equal(t, false, relay) require.Equal(t, "", persisessionsPath) require.Equal(t, PersistentStatePathDefault, persistatePath) - require.Equal(t, false, notificationDefaultEnabled) - require.Equal(t, "", notificationDefaultEndpoint) - require.Equal(t, NotificationTimeoutDefault, notificationDefaultTimeout) - require.Equal(t, "", notificationDefaultTopic) - require.Equal(t, "", notificationDefaultCertPath) - require.Equal(t, "", notificationDefaultKeyPath) - require.Equal(t, "", notificationDefaultCAPath) }) const path = "../../../../config/example/node" @@ -64,13 +49,6 @@ func TestNodeSection(t *testing.T) { wKey := Wallet(c) persisessionsPath := PersistentSessions(c).Path() persistatePath := PersistentState(c).Path() - notificationEnabled := Notification(c).Enabled() - notificationEndpoint := Notification(c).Endpoint() - notificationTimeout := Notification(c).Timeout() - notificationDefaultTopic := Notification(c).DefaultTopic() - notificationCertPath := Notification(c).CertPath() - notificationKeyPath := Notification(c).KeyPath() - notificationCAPath := Notification(c).CAPath() expectedAddr := []struct { str string @@ -123,13 +101,6 @@ func TestNodeSection(t *testing.T) { require.Equal(t, "/sessions", persisessionsPath) require.Equal(t, "/state", persistatePath) - require.Equal(t, true, notificationEnabled) - require.Equal(t, "tls://localhost:4222", notificationEndpoint) - require.Equal(t, 6*time.Second, notificationTimeout) - require.Equal(t, "topic", notificationDefaultTopic) - require.Equal(t, "/cert/path", notificationCertPath) - require.Equal(t, "/key/path", notificationKeyPath) - require.Equal(t, "/ca/path", notificationCAPath) } configtest.ForEachFileType(path, fileConfigTest) diff --git a/cmd/neofs-node/main.go b/cmd/neofs-node/main.go index b802ae76ea9..e1e0d5c3948 100644 --- a/cmd/neofs-node/main.go +++ b/cmd/neofs-node/main.go @@ -137,7 +137,6 @@ func initApp(c *cfg) { initAndLog(c, "accounting", initAccountingService) initAndLog(c, "session", initSessionService) initAndLog(c, "reputation", initReputationService) - initAndLog(c, "notification", initNotifications) initAndLog(c, "object", initObjectService) initAndLog(c, "tree", initTreeService) @@ -165,7 +164,6 @@ func runAndLog(c *cfg, name string, logSuccess bool, starter func(*cfg)) { } func bootUp(c *cfg) { - runAndLog(c, "NATS", true, connectNats) runAndLog(c, "gRPC", false, serveGRPC) runAndLog(c, "notary", true, makeAndWaitNotaryDeposit) diff --git a/cmd/neofs-node/notificator.go b/cmd/neofs-node/notificator.go deleted file mode 100644 index c0f5c9f1981..00000000000 --- a/cmd/neofs-node/notificator.go +++ /dev/null @@ -1,167 +0,0 @@ -package main - -import ( - "encoding/hex" - "fmt" - - nodeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/node" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" - "github.com/nspcc-dev/neofs-node/pkg/morph/event" - "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" - "github.com/nspcc-dev/neofs-node/pkg/services/notificator" - "github.com/nspcc-dev/neofs-node/pkg/services/notificator/nats" - objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" - oid "github.com/nspcc-dev/neofs-sdk-go/object/id" - "go.uber.org/zap" -) - -type notificationSource struct { - e *engine.StorageEngine - l *zap.Logger - defaultTopic string -} - -func (n *notificationSource) Iterate(epoch uint64, handler func(topic string, addr oid.Address)) { - log := n.l.With(zap.Uint64("epoch", epoch)) - - listRes, err := n.e.ListContainers(engine.ListContainersPrm{}) - if err != nil { - log.Error("notificator: could not list containers", zap.Error(err)) - return - } - - filters := objectSDK.NewSearchFilters() - filters.AddNotificationEpochFilter(epoch) - - var selectPrm engine.SelectPrm - selectPrm.WithFilters(filters) - - for _, c := range listRes.Containers() { - selectPrm.WithContainerID(c) - - selectRes, err := n.e.Select(selectPrm) - if err != nil { - log.Error("notificator: could not select objects from container", - zap.Stringer("cid", c), - zap.Error(err), - ) - continue - } - - for _, a := range selectRes.AddressList() { - err = n.processAddress(a, handler) - if err != nil { - log.Error("notificator: could not process object", - zap.Stringer("address", a), - zap.Error(err), - ) - continue - } - } - } - - log.Debug("notificator: finished processing object notifications") -} - -func (n *notificationSource) processAddress( - a oid.Address, - h func(topic string, addr oid.Address), -) error { - var prm engine.HeadPrm - prm.WithAddress(a) - - res, err := n.e.Head(prm) - if err != nil { - return err - } - - ni, err := res.Header().NotificationInfo() - if err != nil { - return fmt.Errorf("could not retrieve notification topic from object: %w", err) - } - - topic := ni.Topic() - - if topic == "" { - topic = n.defaultTopic - } - - h(topic, a) - - return nil -} - -type notificationWriter struct { - l *zap.Logger - w *nats.Writer -} - -func (n notificationWriter) Notify(topic string, address oid.Address) { - if err := n.w.Notify(topic, address); err != nil { - n.l.Warn("could not write object notification", - zap.Stringer("address", address), - zap.String("topic", topic), - zap.Error(err), - ) - } -} - -func initNotifications(c *cfg) { - if nodeconfig.Notification(c.cfgReader).Enabled() { - topic := nodeconfig.Notification(c.cfgReader).DefaultTopic() - pubKey := hex.EncodeToString(c.cfgNodeInfo.localInfo.PublicKey()) - - if topic == "" { - topic = pubKey - } - - natsSvc := nats.New( - nats.WithConnectionName("NeoFS Storage Node: "+pubKey), // connection name is used in the server side logs - nats.WithTimeout(nodeconfig.Notification(c.cfgReader).Timeout()), - nats.WithClientCert( - nodeconfig.Notification(c.cfgReader).CertPath(), - nodeconfig.Notification(c.cfgReader).KeyPath(), - ), - nats.WithRootCA(nodeconfig.Notification(c.cfgReader).CAPath()), - nats.WithLogger(c.log), - ) - - c.cfgNotifications = cfgNotifications{ - enabled: true, - nw: notificationWriter{ - l: c.log, - w: natsSvc, - }, - defaultTopic: topic, - } - - n := notificator.New(new(notificator.Prm). - SetLogger(c.log). - SetNotificationSource( - ¬ificationSource{ - e: c.cfgObject.cfgLocalStorage.localStorage, - l: c.log, - defaultTopic: topic, - }). - SetWriter(c.cfgNotifications.nw), - ) - - addNewEpochAsyncNotificationHandler(c, func(e event.Event) { - ev := e.(netmap.NewEpoch) - - n.ProcessEpoch(ev.EpochNumber()) - }) - } -} - -func connectNats(c *cfg) { - if !c.cfgNotifications.enabled { - return - } - - endpoint := nodeconfig.Notification(c.cfgReader).Endpoint() - err := c.cfgNotifications.nw.w.Connect(c.ctx, endpoint) - if err != nil { - panic(fmt.Sprintf("could not connect to a nats endpoint %s: %v", endpoint, err)) - } -} diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index ba671507f3f..b141352acb7 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -12,7 +12,6 @@ import ( coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client" containercore "github.com/nspcc-dev/neofs-node/pkg/core/container" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" - objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" morphClient "github.com/nspcc-dev/neofs-node/pkg/morph/client" cntClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" @@ -225,24 +224,11 @@ func initObjectService(c *cfg) { c.workers = append(c.workers, c.shared.policer) - var os putsvc.ObjectStorage = engineWithoutNotifications{ - engine: ls, - } - - if c.cfgNotifications.enabled { - os = engineWithNotifications{ - base: os, - nw: c.cfgNotifications.nw, - ns: c.cfgNetmap.state, - defaultTopic: c.cfgNotifications.defaultTopic, - } - } - sPut := putsvc.NewService( putsvc.WithKeyStorage(keyStorage), putsvc.WithClientConstructor(putConstructor), putsvc.WithMaxSizeSource(newCachedMaxObjectSizeSource(c)), - putsvc.WithObjectStorage(os), + putsvc.WithObjectStorage(storageEngine{engine: ls}), putsvc.WithContainerSource(c.cfgObject.cnrSource), putsvc.WithNetworkMapSource(c.netMapSource), putsvc.WithNetmapKeys(c), @@ -519,56 +505,15 @@ func (c *reputationClientConstructor) Get(info coreclient.NodeInfo) (coreclient. return cl, nil } -type engineWithNotifications struct { - base putsvc.ObjectStorage - nw notificationWriter - ns netmap.State - - defaultTopic string -} - -func (e engineWithNotifications) IsLocked(address oid.Address) (bool, error) { - return e.base.IsLocked(address) -} - -func (e engineWithNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error { - return e.base.Delete(tombstone, toDelete) -} - -func (e engineWithNotifications) Lock(locker oid.Address, toLock []oid.ID) error { - return e.base.Lock(locker, toLock) -} - -func (e engineWithNotifications) Put(o *objectSDK.Object) error { - if err := e.base.Put(o); err != nil { - return err - } - - ni, err := o.NotificationInfo() - if err == nil { - if epoch := ni.Epoch(); epoch == 0 || epoch == e.ns.CurrentEpoch() { - topic := ni.Topic() - - if topic == "" { - topic = e.defaultTopic - } - - e.nw.Notify(topic, objectCore.AddressOf(o)) - } - } - - return nil -} - -type engineWithoutNotifications struct { +type storageEngine struct { engine *engine.StorageEngine } -func (e engineWithoutNotifications) IsLocked(address oid.Address) (bool, error) { +func (e storageEngine) IsLocked(address oid.Address) (bool, error) { return e.engine.IsLocked(address) } -func (e engineWithoutNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error { +func (e storageEngine) Delete(tombstone oid.Address, toDelete []oid.ID) error { var prm engine.InhumePrm addrs := make([]oid.Address, len(toDelete)) @@ -583,10 +528,10 @@ func (e engineWithoutNotifications) Delete(tombstone oid.Address, toDelete []oid return err } -func (e engineWithoutNotifications) Lock(locker oid.Address, toLock []oid.ID) error { +func (e storageEngine) Lock(locker oid.Address, toLock []oid.ID) error { return e.engine.Lock(locker.Container(), locker.Object(), toLock) } -func (e engineWithoutNotifications) Put(o *objectSDK.Object) error { +func (e storageEngine) Put(o *objectSDK.Object) error { return engine.Put(e.engine, o) } diff --git a/config/example/node.env b/config/example/node.env index 9ca49dde0c9..53ce5ca5921 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -20,13 +20,6 @@ NEOFS_NODE_ATTRIBUTE_2="VerifiedNodesDomain:nodes.some-org.neofs" NEOFS_NODE_RELAY=true NEOFS_NODE_PERSISTENT_SESSIONS_PATH=/sessions NEOFS_NODE_PERSISTENT_STATE_PATH=/state -NEOFS_NODE_NOTIFICATION_ENABLED=true -NEOFS_NODE_NOTIFICATION_ENDPOINT=tls://localhost:4222 -NEOFS_NODE_NOTIFICATION_TIMEOUT=6s -NEOFS_NODE_NOTIFICATION_DEFAULT_TOPIC=topic -NEOFS_NODE_NOTIFICATION_CERTIFICATE=/cert/path -NEOFS_NODE_NOTIFICATION_KEY=/key/path -NEOFS_NODE_NOTIFICATION_CA=/ca/path # Tree service section NEOFS_TREE_ENABLED=true diff --git a/config/example/node.json b/config/example/node.json index f7ce75e6be1..7dbc1effee2 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -34,15 +34,6 @@ }, "persistent_state": { "path": "/state" - }, - "notification": { - "enabled": true, - "endpoint": "tls://localhost:4222", - "timeout": "6s", - "default_topic": "topic", - "certificate": "/cert/path", - "key": "/key/path", - "ca": "/ca/path" } }, "grpc": { diff --git a/config/example/node.yaml b/config/example/node.yaml index 9f1c7e698c8..b0a47c966e2 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -33,14 +33,6 @@ node: path: /sessions # path to persistent session tokens file of Storage node (default: in-memory sessions) persistent_state: path: /state # path to persistent state file of Storage node - notification: - enabled: true # turn on object notification service - endpoint: "tls://localhost:4222" # notification server endpoint - timeout: "6s" # timeout for object notification client connection - default_topic: "topic" # default topic for object notifications if not found in object's meta - certificate: "/cert/path" # path to TLS certificate - key: "/key/path" # path to TLS key - ca: "/ca/path" # path to optional CA certificate grpc: - endpoint: s01.neofs.devenv:8080 # endpoint for gRPC server diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index 0c87d2a62a3..043230d6a4d 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -286,14 +286,6 @@ node: path: /sessions persistent_state: path: /state - notification: - enabled: true - endpoint: tls://localhost:4222 - timeout: 6s - default_topic: topic - certificate: /path/to/cert.pem - key: /path/to/key.pem - ca: /path/to/ca.pem ``` | Parameter | Type | Default value | Description | @@ -305,7 +297,6 @@ node: | `relay` | `bool` | | Enable relay mode. | | `persistent_sessions` | [Persistent sessions config](#persistent_sessions-subsection) | | Persistent session token store configuration. | | `persistent_state` | [Persistent state config](#persistent_state-subsection) | | Persistent state configuration. | -| `notification` | [Notification config](#notification-subsection) | | NATS configuration. | ## `wallet` subsection @@ -333,19 +324,6 @@ It is used to correctly handle node restarts or crashes. |-----------|----------|------------------------|------------------------| | `path` | `string` | `.neofs-storage-state` | Path to the database. | -## `notification` subsection -This is an advanced section, use with caution. - -| Parameter | Type | Default value | Description | -|-----------------|------------|-------------------|-------------------------------------------------------------------| -| `enabled` | `bool` | `false` | Flag to enable the service. | -| `endpoint` | `string` | | NATS endpoint to connect to. | -| `timeout` | `duration` | `5s` | Timeout for the object notification operation. | -| `default_topic` | `string` | node's public key | Default topic to use if an object has no corresponding attribute. | -| `certificate` | `string` | | Path to the client certificate. | -| `key` | `string` | | Path to the client key. | -| `ca` | `string` | | Override root CA used to verify server certificates. | - # `apiclient` section Configuration for the NeoFS API client used for communication with other NeoFS nodes. diff --git a/go.mod b/go.mod index b878d8c0f49..9cafff5529e 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,6 @@ require ( github.com/mitchellh/go-homedir v1.1.0 github.com/mr-tron/base58 v1.2.0 github.com/multiformats/go-multiaddr v0.12.0 - github.com/nats-io/nats.go v1.31.0 github.com/nspcc-dev/hrw/v2 v2.0.0 github.com/nspcc-dev/locode-db v0.5.0 github.com/nspcc-dev/neo-go v0.105.1 @@ -72,8 +71,6 @@ require ( github.com/multiformats/go-multibase v0.1.1 // indirect github.com/multiformats/go-multihash v0.2.1 // indirect github.com/multiformats/go-varint v0.0.7 // indirect - github.com/nats-io/nkeys v0.4.6 // indirect - github.com/nats-io/nuid v1.0.1 // indirect github.com/nspcc-dev/dbft v0.0.0-20230515113611-25db6ba61d5c // indirect github.com/nspcc-dev/go-ordered-json v0.0.0-20240112074137-296698a162ae // indirect github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20231127165613-b35f351f0ba0 // indirect diff --git a/go.sum b/go.sum index 6f213fdd73e..d677dd31305 100644 --- a/go.sum +++ b/go.sum @@ -233,12 +233,6 @@ github.com/multiformats/go-multihash v0.2.1 h1:aem8ZT0VA2nCHHk7bPJ1BjUbHNciqZC/d github.com/multiformats/go-multihash v0.2.1/go.mod h1:WxoMcYG85AZVQUyRyo9s4wULvW5qrI9vb2Lt6evduFc= github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8= github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU= -github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= -github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= -github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY= -github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts= -github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= -github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nspcc-dev/dbft v0.0.0-20230515113611-25db6ba61d5c h1:uyK5aLbAhrnZtnvobJLN24gGUrlxIJAAFqiWl+liZuo= github.com/nspcc-dev/dbft v0.0.0-20230515113611-25db6ba61d5c/go.mod h1:kjBC9F8L25GR+kIHy/1KgG/KfcoGnVwIiyovgq1uszk= github.com/nspcc-dev/go-ordered-json v0.0.0-20240112074137-296698a162ae h1:UFgMXcZthqiCqCyr3dOAtGICJ10gM8q0mFHyLR0UPQU= diff --git a/pkg/services/notificator/deps.go b/pkg/services/notificator/deps.go deleted file mode 100644 index 780a89b5c54..00000000000 --- a/pkg/services/notificator/deps.go +++ /dev/null @@ -1,20 +0,0 @@ -package notificator - -import ( - oid "github.com/nspcc-dev/neofs-sdk-go/object/id" -) - -// NotificationSource is a source of object notifications. -type NotificationSource interface { - // Iterate must iterate over all notifications for the - // provided epoch and call handler for all of them. - Iterate(epoch uint64, handler func(topic string, addr oid.Address)) -} - -// NotificationWriter notifies all the subscribers -// about new object notifications. -type NotificationWriter interface { - // Notify must notify about an event generated - // from an object with a specific topic. - Notify(topic string, address oid.Address) -} diff --git a/pkg/services/notificator/nats/options.go b/pkg/services/notificator/nats/options.go deleted file mode 100644 index 278705ac55c..00000000000 --- a/pkg/services/notificator/nats/options.go +++ /dev/null @@ -1,38 +0,0 @@ -package nats - -import ( - "time" - - "github.com/nats-io/nats.go" - "go.uber.org/zap" -) - -func WithClientCert(certPath, keyPath string) Option { - return func(o *opts) { - o.nOpts = append(o.nOpts, nats.ClientCert(certPath, keyPath)) - } -} - -func WithRootCA(paths ...string) Option { - return func(o *opts) { - o.nOpts = append(o.nOpts, nats.RootCAs(paths...)) - } -} - -func WithTimeout(timeout time.Duration) Option { - return func(o *opts) { - o.nOpts = append(o.nOpts, nats.Timeout(timeout)) - } -} - -func WithConnectionName(name string) Option { - return func(o *opts) { - o.nOpts = append(o.nOpts, nats.Name(name)) - } -} - -func WithLogger(logger *zap.Logger) Option { - return func(o *opts) { - o.log = logger - } -} diff --git a/pkg/services/notificator/nats/service.go b/pkg/services/notificator/nats/service.go deleted file mode 100644 index a273217e5fc..00000000000 --- a/pkg/services/notificator/nats/service.go +++ /dev/null @@ -1,132 +0,0 @@ -package nats - -import ( - "context" - "errors" - "fmt" - "sync" - - "github.com/nats-io/nats.go" - oid "github.com/nspcc-dev/neofs-sdk-go/object/id" - "go.uber.org/zap" -) - -// Writer is a NATS object notification writer. -// It handles NATS JetStream connections and allows -// sending string representation of the address to -// the NATS server. -// -// For correct operation must be created via New function. -// new(Writer) or Writer{} construction leads to undefined -// behaviour and is not safe. -type Writer struct { - js nats.JetStreamContext - nc *nats.Conn - - m *sync.RWMutex - createdStreams map[string]struct{} - opts -} - -type opts struct { - log *zap.Logger - nOpts []nats.Option -} - -type Option func(*opts) - -var errConnIsClosed = errors.New("connection to the server is closed") - -// Notify sends object address's string representation to the provided topic. -// Uses first 4 bytes of object ID as a message ID to support 'exactly once' -// message delivery. -// -// Returns error only if: -// 1. underlying connection was closed and has not been established again; -// 2. NATS server could not respond that it has saved the message. -func (n *Writer) Notify(topic string, address oid.Address) error { - if !n.nc.IsConnected() { - return errConnIsClosed - } - - // use first 4 byte of the encoded string as - // message ID for the 'exactly once' delivery - messageID := address.Object().EncodeToString()[:4] - - // check if the stream was previously created - n.m.RLock() - _, created := n.createdStreams[topic] - n.m.RUnlock() - - if !created { - _, err := n.js.AddStream(&nats.StreamConfig{ - Name: topic, - }) - if err != nil { - return fmt.Errorf("could not add stream: %w", err) - } - - n.m.Lock() - n.createdStreams[topic] = struct{}{} - n.m.Unlock() - } - - _, err := n.js.Publish(topic, []byte(address.EncodeToString()), nats.MsgId(messageID)) - if err != nil { - return err - } - - return nil -} - -// New creates new Writer. -func New(oo ...Option) *Writer { - w := &Writer{ - m: &sync.RWMutex{}, - createdStreams: make(map[string]struct{}), - opts: opts{ - log: zap.L(), - nOpts: make([]nats.Option, 0, len(oo)+3), - }, - } - - for _, o := range oo { - o(&w.opts) - } - - w.opts.nOpts = append(w.opts.nOpts, - nats.NoCallbacksAfterClientClose(), // do not call callbacks when it was planned writer stop - nats.DisconnectErrHandler(func(conn *nats.Conn, err error) { - w.log.Error("nats: connection was lost", zap.Error(err)) - }), - nats.ReconnectHandler(func(conn *nats.Conn) { - w.log.Warn("nats: reconnected to the server") - }), - ) - - return w -} - -// Connect tries to connect to a specified NATS endpoint. -// -// Connection is closed when passed context is done. -func (n *Writer) Connect(ctx context.Context, endpoint string) error { - nc, err := nats.Connect(endpoint, n.opts.nOpts...) - if err != nil { - return fmt.Errorf("could not connect to server: %w", err) - } - - n.nc = nc - - // usage w/o options is error-free - n.js, _ = nc.JetStream() - - go func() { - <-ctx.Done() - n.opts.log.Info("nats: closing connection as the context is done") - - nc.Close() - }() - - return nil -} diff --git a/pkg/services/notificator/service.go b/pkg/services/notificator/service.go deleted file mode 100644 index b05195a787d..00000000000 --- a/pkg/services/notificator/service.go +++ /dev/null @@ -1,85 +0,0 @@ -package notificator - -import ( - "fmt" - - oid "github.com/nspcc-dev/neofs-sdk-go/object/id" - "go.uber.org/zap" -) - -// Prm groups Notificator constructor's -// parameters. All are required. -type Prm struct { - writer NotificationWriter - notificationSource NotificationSource - logger *zap.Logger -} - -// SetLogger sets a logger. -func (prm *Prm) SetLogger(v *zap.Logger) *Prm { - prm.logger = v - return prm -} - -// SetWriter sets notification writer. -func (prm *Prm) SetWriter(v NotificationWriter) *Prm { - prm.writer = v - return prm -} - -// SetNotificationSource sets notification source. -func (prm *Prm) SetNotificationSource(v NotificationSource) *Prm { - prm.notificationSource = v - return prm -} - -// Notificator is a notification producer that handles -// objects with defined notification epoch. -// -// Working client must be created via constructor New. -// Using the Client that has been created with new(Client) -// expression (or just declaring a Client variable) is unsafe -// and can lead to panic. -type Notificator struct { - w NotificationWriter - ns NotificationSource - l *zap.Logger -} - -// New creates, initializes and returns the Notificator instance. -// -// Panics if any field of the passed Prm structure is not set/set -// to nil. -func New(prm *Prm) *Notificator { - panicOnNil := func(v any, name string) { - if v == nil { - panic(fmt.Sprintf("Notificator constructor: %s is nil\n", name)) - } - } - - panicOnNil(prm.writer, "NotificationWriter") - panicOnNil(prm.notificationSource, "NotificationSource") - panicOnNil(prm.logger, "Logger") - - return &Notificator{ - w: prm.writer, - ns: prm.notificationSource, - l: prm.logger, - } -} - -// ProcessEpoch looks for all objects with defined epoch in the storage -// and passes their addresses to the NotificationWriter. -func (n *Notificator) ProcessEpoch(epoch uint64) { - logger := n.l.With(zap.Uint64("epoch", epoch)) - logger.Debug("notificator: start processing object notifications") - - n.ns.Iterate(epoch, func(topic string, addr oid.Address) { - n.l.Debug("notificator: processing object notification", - zap.String("topic", topic), - zap.Stringer("address", addr), - ) - - n.w.Notify(topic, addr) - }) -}