Skip to content

Commit

Permalink
Add EventID (#3637)
Browse files Browse the repository at this point in the history
* add unique id to events

Signed-off-by: jkoberg <jkoberg@owncloud.com>

* add ConsumeAll function

Signed-off-by: jkoberg <jkoberg@owncloud.com>

* add helper to create natsstream from config

Signed-off-by: jkoberg <jkoberg@owncloud.com>

---------

Signed-off-by: jkoberg <jkoberg@owncloud.com>
  • Loading branch information
kobergj authored Feb 13, 2023
1 parent ff24f8b commit 4a49209
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 30 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/add-eventid.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Add an ID to each events

This way it is possible to uniquely identify events across services

https://github.com/cs3org/reva/pull/3637
43 changes: 40 additions & 3 deletions pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"log"
"reflect"

"github.com/google/uuid"
"go-micro.dev/v4/events"
)

Expand All @@ -34,6 +35,9 @@ var (

// MetadatakeyEventType is the key used for the eventtype in the metadata map of the event
MetadatakeyEventType = "eventtype"

// MetadatakeyEventID is the key used for the eventID in the metadata map of the event
MetadatakeyEventID = "eventid"
)

type (
Expand All @@ -57,12 +61,19 @@ type (
Publish(string, interface{}, ...events.PublishOption) error
Consume(string, ...events.ConsumeOption) (<-chan events.Event, error)
}

// Event is the envelope for events
Event struct {
Type string
ID string
Event interface{}
}
)

// Consume returns a channel that will get all events that match the given evs
// group defines the service type: One group will get exactly one copy of a event that is emitted
// NOTE: uses reflect on initialization
func Consume(s Consumer, group string, evs ...Unmarshaller) (<-chan interface{}, error) {
func Consume(s Consumer, group string, evs ...Unmarshaller) (<-chan Event, error) {
c, err := s.Consume(MainQueueName, events.WithGroup(group))
if err != nil {
return nil, err
Expand All @@ -74,7 +85,7 @@ func Consume(s Consumer, group string, evs ...Unmarshaller) (<-chan interface{},
registeredEvents[typ.String()] = e
}

outchan := make(chan interface{})
outchan := make(chan Event)
go func() {
for {
e := <-c
Expand All @@ -90,7 +101,32 @@ func Consume(s Consumer, group string, evs ...Unmarshaller) (<-chan interface{},
continue
}

outchan <- event
outchan <- Event{
Type: et,
ID: e.Metadata[MetadatakeyEventID],
Event: event,
}
}
}()
return outchan, nil
}

// ConsumeAll allows consuming all events. Note that unmarshalling must be done manually in this case, therefore Event.Event will always be of type []byte
func ConsumeAll(s Consumer, group string) (<-chan Event, error) {
c, err := s.Consume(MainQueueName, events.WithGroup(group))
if err != nil {
return nil, err
}

outchan := make(chan Event)
go func() {
for {
e := <-c
outchan <- Event{
Type: e.Metadata[MetadatakeyEventType],
ID: e.Metadata[MetadatakeyEventID],
Event: e.Payload,
}
}
}()
return outchan, nil
Expand All @@ -102,5 +138,6 @@ func Publish(s Publisher, ev interface{}) error {
evName := reflect.TypeOf(ev).String()
return s.Publish(MainQueueName, ev, events.WithMetadata(map[string]string{
MetadatakeyEventType: evName,
MetadatakeyEventID: uuid.New().String(),
}))
}
2 changes: 1 addition & 1 deletion pkg/events/example/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func Example(c events.Consumer) {
event := <-evChan

// best to use type switch to differentiate events
switch v := event.(type) {
switch v := event.Event.(type) {
case events.ShareCreated:
fmt.Printf("%s) Share created: %+v\n", group, v)
default:
Expand Down
95 changes: 95 additions & 0 deletions pkg/events/stream/nats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package stream

import (
"bytes"
"crypto/tls"
"crypto/x509"
"errors"
"io"
"os"
"time"

"github.com/cenkalti/backoff"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/logger"
"github.com/go-micro/plugins/v4/events/natsjs"
)

// NatsConfig is the configuration needed for a NATS event stream
type NatsConfig struct {
Endpoint string // Endpoint of the nats server
Cluster string // CluserID of the nats cluster
TLSInsecure bool // Whether to verify TLS certificates
TLSRootCACertificate string // The root CA certificate used to validate the TLS certificate
EnableTLS bool // Enable TLS
}

// NatsFromConfig returns a nats stream from the given config
func NatsFromConfig(cfg NatsConfig) (events.Stream, error) {
var tlsConf *tls.Config
if cfg.EnableTLS {
var rootCAPool *x509.CertPool
if cfg.TLSRootCACertificate != "" {
rootCrtFile, err := os.Open(cfg.TLSRootCACertificate)
if err != nil {
return nil, err
}

rootCAPool, err = newCertPoolFromPEM(rootCrtFile)
if err != nil {
return nil, err
}
cfg.TLSInsecure = false
}

tlsConf = &tls.Config{
MinVersion: tls.VersionTLS12,
InsecureSkipVerify: cfg.TLSInsecure, //nolint:gosec
RootCAs: rootCAPool,
}
}
return Nats(
natsjs.TLSConfig(tlsConf),
natsjs.Address(cfg.Endpoint),
natsjs.ClusterID(cfg.Cluster),
)

}

// Nats returns a nats streaming client
// retries exponentially to connect to a nats server
func Nats(opts ...natsjs.Option) (events.Stream, error) {
b := backoff.NewExponentialBackOff()
var stream events.Stream
o := func() error {
n := b.NextBackOff()
s, err := natsjs.NewStream(opts...)
if err != nil && n > time.Second {
logger.New().Error().Err(err).Msgf("can't connect to nats (jetstream) server, retrying in %s", n)
}
stream = s
return err
}

err := backoff.Retry(o, b)
return stream, err
}

// newCertPoolFromPEM reads certificates from io.Reader and returns a x509.CertPool
// containing those certificates.
func newCertPoolFromPEM(crts ...io.Reader) (*x509.CertPool, error) {
certPool := x509.NewCertPool()

var buf bytes.Buffer
for _, c := range crts {
if _, err := io.Copy(&buf, c); err != nil {
return nil, err
}
if !certPool.AppendCertsFromPEM(buf.Bytes()) {
return nil, errors.New("failed to append cert from PEM")
}
buf.Reset()
}

return certPool, nil
}
24 changes: 0 additions & 24 deletions pkg/events/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,10 @@ package stream
import (
"encoding/json"
"reflect"
"time"

"github.com/cenkalti/backoff"
"github.com/cs3org/reva/v2/pkg/logger"
"go-micro.dev/v4/events"

"github.com/go-micro/plugins/v4/events/natsjs"
)

// Nats returns a nats streaming client
// retries exponentially to connect to a nats server
func Nats(opts ...natsjs.Option) (events.Stream, error) {
b := backoff.NewExponentialBackOff()
var stream events.Stream
o := func() error {
n := b.NextBackOff()
s, err := natsjs.NewStream(opts...)
if err != nil && n > time.Second {
logger.New().Error().Err(err).Msgf("can't connect to nats (jetstream) server, retrying in %s", n)
}
stream = s
return err
}

err := backoff.Retry(o, b)
return stream, err
}

// Chan is a channel based streaming clients
// Useful for tests or in memory applications
type Chan [2]chan interface{}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,11 @@ func New(o *options.Options, lu *lookup.Lookup, p Permissions, tp Tree, es event
}

// Postprocessing starts the postprocessing result collector
func (fs *Decomposedfs) Postprocessing(ch <-chan interface{}) {
func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
ctx := context.TODO()
log := logger.New()
for event := range ch {
switch ev := event.(type) {
switch ev := event.Event.(type) {
case events.PostprocessingFinished:
up, err := upload.Get(ctx, ev.UploadID, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens)
if err != nil {
Expand Down

0 comments on commit 4a49209

Please sign in to comment.