From 33e89235250a48f55192b00fd0c428f29baf000e Mon Sep 17 00:00:00 2001 From: Ali Najafizadeh Date: Thu, 9 Jan 2025 14:55:32 -0500 Subject: [PATCH] Add namespaces feature --- README.md | 27 ++++++++++++++++++++-- bus_test.go | 20 +++++++++------- cmd/bus/action/debug.go | 19 +++++++++++---- cmd/bus/action/server.go | 19 ++++++++++++--- docker-compose.yml | 3 +++ examples/confirm/main.go | 5 ++++ examples/pub-sub/main.go | 5 ++++ examples/request-reply/main.go | 5 ++++ gen.go | 2 +- go.mod | 2 +- go.sum | 4 ++-- http.go | 39 +++++++++++++++++++++++++++---- matcher.go | 10 ++++++-- matcher_test.go | 42 ++++++++++++++++++++++++++++++++++ 14 files changed, 174 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 8b315f1..92be28e 100644 --- a/README.md +++ b/README.md @@ -28,13 +28,13 @@ To install `bus`, use: ```shell -go get ella.to/bus@v0.3.0 +go get ella.to/bus@v0.3.3 ``` to install a cli, run the following ```shell -go install ella.to/bus/cmd/bus@v0.3.0 +go install ella.to/bus/cmd/bus@v0.3.3 ``` and to run the server using docker, simply use the provided docker-compose and run it @@ -43,6 +43,29 @@ and to run the server using docker, simply use the provided docker-compose and r docker-compose up ``` +## Namespaces + +Namespaces have been introduced to efficiently organize events by ensuring that not all events are saved in a single file. Each namespace has its own dedicated file. All namespaces must be defined when starting the Bus server by using the `--namespaces` flag. + +### What Are Namespaces? + +Namespaces are essentially the first segment of a topic. For example, in the topic a.b.c, the namespace is a. + +The Bus server also includes a special namespace called `_bus_`, reserved for internal bus operations. It is strongly recommended not to consume events from the `_bus_` namespace. + +### Best Practices for Namespaces + +When defining namespaces, consider your business logic and choose meaningful names that clearly represent their purpose. For instance: + +- If the Bus is used to handle RPC calls, a good namespace might be `rpc`. +- For user-related operations, you might use user. + +### Key Features and Limitations + +Event Sequencing Within Namespaces: The Bus guarantees the sequence of events stored within a single namespace. +No Cross-Namespace Sequencing Guarantee: The Bus does not guarantee the sequence of messages stored across different namespaces. +By following these guidelines, you can keep your Bus server organized and aligned with your application's goals. + ## Basic Example At its core, bus is a pub/sub library, enabling asynchronous communication between publishers and subscribers. Here’s how to publish an event after creating a client diff --git a/bus_test.go b/bus_test.go index d190cca..ca3e726 100644 --- a/bus_test.go +++ b/bus_test.go @@ -16,10 +16,14 @@ import ( "ella.to/task" ) -func createBusServer(t *testing.T, eventLogs string) *bus.Client { - os.Remove(eventLogs) +func createBusServer(t *testing.T, eventLogsDir string) *bus.Client { + os.RemoveAll(eventLogsDir) - storage, err := immuta.New(eventLogs, 10, true) + storage, err := immuta.New( + immuta.WithLogsDirPath(eventLogsDir), + immuta.WithNamespaces("a"), + immuta.WithFastWrite(true), + ) if err != nil { t.Fatal(err) } @@ -31,14 +35,14 @@ func createBusServer(t *testing.T, eventLogs string) *bus.Client { t.Cleanup(func() { storage.Close() server.Close() - os.Remove(eventLogs) + os.RemoveAll(eventLogsDir) }) return bus.NewClient(server.URL) } func TestBasicPutUsage(t *testing.T) { - client := createBusServer(t, "TestBasicUsage.log") + client := createBusServer(t, "TestBasicUsage") resp := client.Put(context.Background(), bus.WithSubject("a.b.c"), bus.WithData("hello world")) if resp.Error() != nil { @@ -49,7 +53,7 @@ func TestBasicPutUsage(t *testing.T) { } func TestBasicPutGetUsage(t *testing.T) { - client := createBusServer(t, "TestBasicPutGetUsage.log") + client := createBusServer(t, "TestBasicPutGetUsage") resp := client.Put(context.Background(), bus.WithSubject("a.b.c"), bus.WithData("hello world")) if resp.Error() != nil { @@ -80,7 +84,7 @@ func TestBasicPutGetUsage(t *testing.T) { } func TestSinglePutMultipleGet(t *testing.T) { - client := createBusServer(t, "TestBasicUsage.log") + client := createBusServer(t, "TestBasicUsage") n := 1_00 p := 10 @@ -144,7 +148,7 @@ func TestSinglePutMultipleGet(t *testing.T) { } func TestPullClose(t *testing.T) { - client := createBusServer(t, "TestPull.log") + client := createBusServer(t, "TestPull") ctx, cancel := context.WithCancel(context.Background()) diff --git a/cmd/bus/action/debug.go b/cmd/bus/action/debug.go index c390fd1..accc6b4 100644 --- a/cmd/bus/action/debug.go +++ b/cmd/bus/action/debug.go @@ -25,13 +25,22 @@ func DebugCommand() *cli.Command { Usage: "directory to store events and consumers information", Value: "./bus_data", }, + &cli.StringFlag{ + Name: "namespace", + Usage: "namespace to filter events", + Required: true, + }, }, Action: func(c *cli.Context) error { dir := c.String("dir") - - filepath := fmt.Sprintf("%s/events.log", dir) - - im, err := immuta.New(filepath, 2, true) + namespace := c.String("namespace") + + im, err := immuta.New( + immuta.WithLogsDirPath(dir), + immuta.WithNamespaces(namespace), + immuta.WithFastWrite(true), + immuta.WithReaderCount(2), + ) if err != nil { log.Fatal(err) } @@ -41,7 +50,7 @@ func DebugCommand() *cli.Command { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - stream := im.Stream(ctx, 0) + stream := im.Stream(ctx, namespace, 0) defer stream.Done() var count int64 diff --git a/cmd/bus/action/server.go b/cmd/bus/action/server.go index bab4f40..1bb2cc3 100644 --- a/cmd/bus/action/server.go +++ b/cmd/bus/action/server.go @@ -39,14 +39,20 @@ func ServerCommand() *cli.Command { }, &cli.StringFlag{ Name: "path", - Usage: "path to events log file", - Value: "./bus_data/events.log", + Usage: "dir path to events log files", + Value: "./bus_data", + }, + &cli.StringFlag{ + Name: "namespaces", + Usage: "list of namespaces separated by comma", + Required: true, }, }, Action: func(c *cli.Context) error { logLevel := getLogLevel(getValue(os.Getenv("BUS_LOG_LEVEL"), "INFO")) addr := getValue(os.Getenv("BUS_ADDR"), c.String("addr")) path := getValue(os.Getenv("BUS_PATH"), c.String("path")) + namespaces := getSliceValues(os.Getenv("BUS_NAMESPACES"), c.String("namespaces"), ",") slog.SetLogLoggerLevel(logLevel) @@ -54,7 +60,7 @@ func ServerCommand() *cli.Command { return err } - server, err := bus.NewServer(addr, path) + server, err := bus.NewServer(addr, path, namespaces) if err != nil { return err } @@ -87,6 +93,13 @@ func ServerCommand() *cli.Command { } } +func getSliceValues(a string, b string, split string) []string { + if a != "" { + return strings.Split(a, split) + } + return strings.Split(b, split) +} + func getValue(seq ...string) string { for _, s := range seq { if s != "" { diff --git a/docker-compose.yml b/docker-compose.yml index c18b812..83e0c7d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -6,6 +6,9 @@ services: - BUS_LOG_LEVEL=INFO - BUS_PATH=/storage/events.log - BUS_ADDR=0.0.0.0:2021 + # each application requires a different namespaces + # list them separated by comma here + - BUS_NAMESPACES=namespace1,namespace2 # if ports is removed, then the bus service can # only be accessed from within the network bus diff --git a/examples/confirm/main.go b/examples/confirm/main.go index 4b1e062..9816139 100644 --- a/examples/confirm/main.go +++ b/examples/confirm/main.go @@ -6,6 +6,11 @@ import ( "ella.to/bus" ) +// NOTE: +// make sure to run provide namepaces "a" before running the server +// for example +// go run cmd/bus/main.go server --namespaces a + func main() { client := bus.NewClient("http://localhost:2021") diff --git a/examples/pub-sub/main.go b/examples/pub-sub/main.go index 523e472..9077b67 100644 --- a/examples/pub-sub/main.go +++ b/examples/pub-sub/main.go @@ -6,6 +6,11 @@ import ( "ella.to/bus" ) +// NOTE: +// make sure to run provide namepaces "a" before running the server +// for example +// go run cmd/bus/main.go server --namespaces a + func main() { client := bus.NewClient("http://localhost:2021") diff --git a/examples/request-reply/main.go b/examples/request-reply/main.go index e37fea7..2d8f7c2 100644 --- a/examples/request-reply/main.go +++ b/examples/request-reply/main.go @@ -7,6 +7,11 @@ import ( "ella.to/bus" ) +// NOTE: +// make sure to run provide namepaces "math" before running the server +// for example +// go run cmd/bus/main.go server --namespaces math + type Request struct { A int B int diff --git a/gen.go b/gen.go index 155002a..aed6c81 100644 --- a/gen.go +++ b/gen.go @@ -5,7 +5,7 @@ import ( ) func newInboxSubject() string { - return newId("i.") + return newId("_bus_.") } func newEventId() string { diff --git a/go.mod b/go.mod index 8a5e647..e4d4013 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module ella.to/bus go 1.23.1 require ( - ella.to/immuta v0.0.3 + ella.to/immuta v0.0.4 ella.to/sse v0.0.6 ella.to/task v0.0.5 github.com/rs/xid v1.6.0 diff --git a/go.sum b/go.sum index b3b6305..25a57b3 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -ella.to/immuta v0.0.3 h1:QCtTvF7Lr5c807KUvPBJVCZLfQt7Rb+LRE6n+XIyLe4= -ella.to/immuta v0.0.3/go.mod h1:0lKIf8Nu+irCQz5pX7lqrQaepMyfNM3zl4kRzK/oLNw= +ella.to/immuta v0.0.4 h1:tsOvWm0EJkqgnpR7aXU2GylFOe8bdAtKAJrCxyypsr0= +ella.to/immuta v0.0.4/go.mod h1:0lKIf8Nu+irCQz5pX7lqrQaepMyfNM3zl4kRzK/oLNw= ella.to/solid v0.0.2 h1:RSxqe/HEbMZ/vcwwDD9xMQV643Q512IcS9IF5KHcp9w= ella.to/solid v0.0.2/go.mod h1:0+qlRn4069za08wDegNAFUICaKAZCZHKUjav8UEZml0= ella.to/sse v0.0.6 h1:3zJui8y5iOfojWku3QZMFKKSinRtAtKivD8u3TrtDU4= diff --git a/http.go b/http.go index a707d72..1d7709c 100644 --- a/http.go +++ b/http.go @@ -63,13 +63,15 @@ func (h *Handler) Put(w http.ResponseWriter, r *http.Request) { var eventIndex int64 + namespaceIdx := strings.Index(event.Subject, ".") + err := h.runner.Submit(ctx, func(ctx context.Context) error { err := event.encode(&h.buffer) if err != nil { return err } - eventIndex, _, err = h.eventsLog.Append(ctx, &h.buffer) + eventIndex, _, err = h.eventsLog.Append(ctx, event.Subject[:namespaceIdx], &h.buffer) if err != nil { return err } @@ -137,7 +139,9 @@ func (h *Handler) Get(w http.ResponseWriter, r *http.Request) { startPos = 0 } - stream := h.eventsLog.Stream(ctx, startPos) + namespaceIdx := strings.Index(subject, ".") + + stream := h.eventsLog.Stream(ctx, subject[:namespaceIdx], startPos) defer stream.Done() for { @@ -283,8 +287,35 @@ func NewHandler(eventLogs *immuta.Storage, runner task.Runner) *Handler { return h } -func NewServer(addr string, eventLogs string) (*http.Server, error) { - eventStorage, err := immuta.New(eventLogs, 10, true) +func NewServer(addr string, logsDirPath string, namespaces []string) (*http.Server, error) { + { + // This block is used to validate the namespaces + // and make sure there is no reserved and duplicate namespaces + + namespacesSet := make(map[string]struct{}) + for _, ns := range namespaces { + namespacesSet[ns] = struct{}{} + } + + // NOTE: currently bus has an internal namespace "_bus_" which was used to store + // RPC and Confirm events. This namespace should not be consumed by the user. + if _, ok := namespacesSet["_bus_"]; ok { + return nil, errors.New("namespace _bus_ is reserved") + } + + namespaces = make([]string, 0, len(namespacesSet)+1) + namespaces = append(namespaces, "_bus_") + for ns := range namespacesSet { + namespaces = append(namespaces, ns) + } + } + + eventStorage, err := immuta.New( + immuta.WithLogsDirPath(logsDirPath), + immuta.WithReaderCount(5), + immuta.WithFastWrite(true), + immuta.WithNamespaces(namespaces...), + ) if err != nil { return nil, err } diff --git a/matcher.go b/matcher.go index 65b9e81..dd0becb 100644 --- a/matcher.go +++ b/matcher.go @@ -65,8 +65,14 @@ func ValidateSubject(subject string) error { } for i, c := range subject { - if i == 0 && c == '.' { - return errors.New("subject should not starts with .") + if i == 0 { + if c == '.' { + return errors.New("subject should not starts with .") + } else if c == '*' { + return errors.New("subject should not starts with *") + } else if c == '>' { + return errors.New("subject should not starts with >") + } } if i == len(subject)-1 && c == '.' { diff --git a/matcher_test.go b/matcher_test.go index 2d50090..1130bfa 100644 --- a/matcher_test.go +++ b/matcher_test.go @@ -71,3 +71,45 @@ func BenchmarkMatchSubject(b *testing.B) { bus.MatchSubject("a.b.c", "a.*.c") } } + +func TestValidateSubject(t *testing.T) { + tests := []struct { + name string + subject string + wantErr bool + errMsg string + }{ + // Valid subjects + {"Valid simple subject", "a.b.c", false, ""}, + {"Valid subject with *", "a.b.*", false, ""}, + {"Valid subject with >", "a.b.>", false, ""}, + + // Invalid subjects + {"Should always start with alphanumeric 1", ".a.b", true, "subject should not starts with ."}, + {"Should always start with alphanumeric 2", "*", true, "subject should not starts with *"}, + {"Should always start with alphanumeric 3", ">", true, "subject should not starts with >"}, + {"Empty subject", "", true, "subject is empty"}, + {"Starts with .", ".a.b", true, "subject should not starts with ."}, + {"Ends with .", "a.b.", true, "subject should not ends with ."}, + {"Contains spaces", "a. b.c", true, "subject should not have spaces"}, + {"Series of dots", "a..b.c", true, "subject should not have series of dots one after another"}, + {"Invalid character", "a.b.c$", true, "subject should have only consists of alphanumerics, dots, *, > and _"}, + {"Middle >", "a.b>.c", true, "subject should not have anything after >"}, + {"No dot before >", "a.b>", true, "subject should have a dot before >"}, + {"No dot before *", "a.b*", true, "subject should have a dot before *"}, + {"No dot after *", "a.*b", true, "subject should have a dot after *"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := bus.ValidateSubject(tt.subject) + if (err != nil) != tt.wantErr { + t.Errorf("expected error: %v, got: %v", tt.wantErr, err) + } + + if tt.wantErr && err.Error() != tt.errMsg { + t.Errorf("expected error message: %v, got: %v", tt.errMsg, err.Error()) + } + }) + } +}