Skip to content

Commit

Permalink
add helper to create natsstream from config
Browse files Browse the repository at this point in the history
Signed-off-by: jkoberg <jkoberg@owncloud.com>
  • Loading branch information
kobergj committed Feb 3, 2023
1 parent db01d99 commit c863bb5
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 24 deletions.
74 changes: 74 additions & 0 deletions pkg/events/stream/nats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package stream

import (
"crypto/tls"
"crypto/x509"
"os"
"time"

"github.com/cenkalti/backoff"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/logger"
"github.com/go-micro/plugins/v4/events/natsjs"
"github.com/owncloud/ocis/v2/ocis-pkg/crypto"
)

// NatsConfig is the configuration needed for a NATS event stream
type NatsConfig struct {
Endpoint string // Endpoint of the nats server
Cluster string // CluserID of the nats cluster
TLSInsecure bool // Whether to verify TLS certificates
TLSRootCACertificate string // The root CA certificate used to validate the TLS certificate
EnableTLS bool // Enable TLS
}

// NatsFromConfig returns a nats stream from the given config
func NatsFromConfig(cfg NatsConfig) (events.Stream, error) {
var tlsConf *tls.Config
if cfg.EnableTLS {
var rootCAPool *x509.CertPool
if cfg.TLSRootCACertificate != "" {
rootCrtFile, err := os.Open(cfg.TLSRootCACertificate)
if err != nil {
return nil, err
}

rootCAPool, err = crypto.NewCertPoolFromPEM(rootCrtFile)
if err != nil {
return nil, err
}
cfg.TLSInsecure = false
}

tlsConf = &tls.Config{
MinVersion: tls.VersionTLS12,
InsecureSkipVerify: cfg.TLSInsecure, //nolint:gosec
RootCAs: rootCAPool,
}
}
return Nats(
natsjs.TLSConfig(tlsConf),
natsjs.Address(cfg.Endpoint),
natsjs.ClusterID(cfg.Cluster),
)

}

// Nats returns a nats streaming client
// retries exponentially to connect to a nats server
func Nats(opts ...natsjs.Option) (events.Stream, error) {
b := backoff.NewExponentialBackOff()
var stream events.Stream
o := func() error {
n := b.NextBackOff()
s, err := natsjs.NewStream(opts...)
if err != nil && n > time.Second {
logger.New().Error().Err(err).Msgf("can't connect to nats (jetstream) server, retrying in %s", n)
}
stream = s
return err
}

err := backoff.Retry(o, b)
return stream, err
}
24 changes: 0 additions & 24 deletions pkg/events/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,10 @@ package stream
import (
"encoding/json"
"reflect"
"time"

"github.com/cenkalti/backoff"
"github.com/cs3org/reva/v2/pkg/logger"
"go-micro.dev/v4/events"

"github.com/go-micro/plugins/v4/events/natsjs"
)

// Nats returns a nats streaming client
// retries exponentially to connect to a nats server
func Nats(opts ...natsjs.Option) (events.Stream, error) {
b := backoff.NewExponentialBackOff()
var stream events.Stream
o := func() error {
n := b.NextBackOff()
s, err := natsjs.NewStream(opts...)
if err != nil && n > time.Second {
logger.New().Error().Err(err).Msgf("can't connect to nats (jetstream) server, retrying in %s", n)
}
stream = s
return err
}

err := backoff.Retry(o, b)
return stream, err
}

// Chan is a channel based streaming clients
// Useful for tests or in memory applications
type Chan [2]chan interface{}
Expand Down

0 comments on commit c863bb5

Please sign in to comment.