Skip to content

Commit

Permalink
Add namespaces feature
Browse files Browse the repository at this point in the history
  • Loading branch information
alinz committed Jan 9, 2025
1 parent 6bbadb0 commit 33e8923
Show file tree
Hide file tree
Showing 14 changed files with 174 additions and 28 deletions.
27 changes: 25 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
To install `bus`, use:

```shell
go get ella.to/bus@v0.3.0
go get ella.to/bus@v0.3.3
```

to install a cli, run the following

```shell
go install ella.to/bus/cmd/bus@v0.3.0
go install ella.to/bus/cmd/bus@v0.3.3
```

and to run the server using docker, simply use the provided docker-compose and run it
Expand All @@ -43,6 +43,29 @@ and to run the server using docker, simply use the provided docker-compose and r
docker-compose up
```

## Namespaces

Namespaces have been introduced to efficiently organize events by ensuring that not all events are saved in a single file. Each namespace has its own dedicated file. All namespaces must be defined when starting the Bus server by using the `--namespaces` flag.

### What Are Namespaces?

Namespaces are essentially the first segment of a topic. For example, in the topic a.b.c, the namespace is a.

The Bus server also includes a special namespace called `_bus_`, reserved for internal bus operations. It is strongly recommended not to consume events from the `_bus_` namespace.

### Best Practices for Namespaces

When defining namespaces, consider your business logic and choose meaningful names that clearly represent their purpose. For instance:

- If the Bus is used to handle RPC calls, a good namespace might be `rpc`.
- For user-related operations, you might use user.

### Key Features and Limitations

Event Sequencing Within Namespaces: The Bus guarantees the sequence of events stored within a single namespace.
No Cross-Namespace Sequencing Guarantee: The Bus does not guarantee the sequence of messages stored across different namespaces.
By following these guidelines, you can keep your Bus server organized and aligned with your application's goals.

## Basic Example

At its core, bus is a pub/sub library, enabling asynchronous communication between publishers and subscribers. Here’s how to publish an event after creating a client
Expand Down
20 changes: 12 additions & 8 deletions bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@ import (
"ella.to/task"
)

func createBusServer(t *testing.T, eventLogs string) *bus.Client {
os.Remove(eventLogs)
func createBusServer(t *testing.T, eventLogsDir string) *bus.Client {
os.RemoveAll(eventLogsDir)

storage, err := immuta.New(eventLogs, 10, true)
storage, err := immuta.New(
immuta.WithLogsDirPath(eventLogsDir),
immuta.WithNamespaces("a"),
immuta.WithFastWrite(true),
)
if err != nil {
t.Fatal(err)
}
Expand All @@ -31,14 +35,14 @@ func createBusServer(t *testing.T, eventLogs string) *bus.Client {
t.Cleanup(func() {
storage.Close()
server.Close()
os.Remove(eventLogs)
os.RemoveAll(eventLogsDir)
})

return bus.NewClient(server.URL)
}

func TestBasicPutUsage(t *testing.T) {
client := createBusServer(t, "TestBasicUsage.log")
client := createBusServer(t, "TestBasicUsage")

resp := client.Put(context.Background(), bus.WithSubject("a.b.c"), bus.WithData("hello world"))
if resp.Error() != nil {
Expand All @@ -49,7 +53,7 @@ func TestBasicPutUsage(t *testing.T) {
}

func TestBasicPutGetUsage(t *testing.T) {
client := createBusServer(t, "TestBasicPutGetUsage.log")
client := createBusServer(t, "TestBasicPutGetUsage")

resp := client.Put(context.Background(), bus.WithSubject("a.b.c"), bus.WithData("hello world"))
if resp.Error() != nil {
Expand Down Expand Up @@ -80,7 +84,7 @@ func TestBasicPutGetUsage(t *testing.T) {
}

func TestSinglePutMultipleGet(t *testing.T) {
client := createBusServer(t, "TestBasicUsage.log")
client := createBusServer(t, "TestBasicUsage")

n := 1_00
p := 10
Expand Down Expand Up @@ -144,7 +148,7 @@ func TestSinglePutMultipleGet(t *testing.T) {
}

func TestPullClose(t *testing.T) {
client := createBusServer(t, "TestPull.log")
client := createBusServer(t, "TestPull")

ctx, cancel := context.WithCancel(context.Background())

Expand Down
19 changes: 14 additions & 5 deletions cmd/bus/action/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,22 @@ func DebugCommand() *cli.Command {
Usage: "directory to store events and consumers information",
Value: "./bus_data",
},
&cli.StringFlag{
Name: "namespace",
Usage: "namespace to filter events",
Required: true,
},
},
Action: func(c *cli.Context) error {
dir := c.String("dir")

filepath := fmt.Sprintf("%s/events.log", dir)

im, err := immuta.New(filepath, 2, true)
namespace := c.String("namespace")

im, err := immuta.New(
immuta.WithLogsDirPath(dir),
immuta.WithNamespaces(namespace),
immuta.WithFastWrite(true),
immuta.WithReaderCount(2),
)
if err != nil {
log.Fatal(err)
}
Expand All @@ -41,7 +50,7 @@ func DebugCommand() *cli.Command {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

stream := im.Stream(ctx, 0)
stream := im.Stream(ctx, namespace, 0)
defer stream.Done()

var count int64
Expand Down
19 changes: 16 additions & 3 deletions cmd/bus/action/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,28 @@ func ServerCommand() *cli.Command {
},
&cli.StringFlag{
Name: "path",
Usage: "path to events log file",
Value: "./bus_data/events.log",
Usage: "dir path to events log files",
Value: "./bus_data",
},
&cli.StringFlag{
Name: "namespaces",
Usage: "list of namespaces separated by comma",
Required: true,
},
},
Action: func(c *cli.Context) error {
logLevel := getLogLevel(getValue(os.Getenv("BUS_LOG_LEVEL"), "INFO"))
addr := getValue(os.Getenv("BUS_ADDR"), c.String("addr"))
path := getValue(os.Getenv("BUS_PATH"), c.String("path"))
namespaces := getSliceValues(os.Getenv("BUS_NAMESPACES"), c.String("namespaces"), ",")

slog.SetLogLoggerLevel(logLevel)

if err := os.MkdirAll(filepath.Base(path), os.ModePerm); err != nil {
return err
}

server, err := bus.NewServer(addr, path)
server, err := bus.NewServer(addr, path, namespaces)
if err != nil {
return err
}
Expand Down Expand Up @@ -87,6 +93,13 @@ func ServerCommand() *cli.Command {
}
}

func getSliceValues(a string, b string, split string) []string {
if a != "" {
return strings.Split(a, split)
}
return strings.Split(b, split)
}

func getValue(seq ...string) string {
for _, s := range seq {
if s != "" {
Expand Down
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ services:
- BUS_LOG_LEVEL=INFO
- BUS_PATH=/storage/events.log
- BUS_ADDR=0.0.0.0:2021
# each application requires a different namespaces
# list them separated by comma here
- BUS_NAMESPACES=namespace1,namespace2

# if ports is removed, then the bus service can
# only be accessed from within the network bus
Expand Down
5 changes: 5 additions & 0 deletions examples/confirm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import (
"ella.to/bus"
)

// NOTE:
// make sure to run provide namepaces "a" before running the server
// for example
// go run cmd/bus/main.go server --namespaces a

func main() {
client := bus.NewClient("http://localhost:2021")

Expand Down
5 changes: 5 additions & 0 deletions examples/pub-sub/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import (
"ella.to/bus"
)

// NOTE:
// make sure to run provide namepaces "a" before running the server
// for example
// go run cmd/bus/main.go server --namespaces a

func main() {
client := bus.NewClient("http://localhost:2021")

Expand Down
5 changes: 5 additions & 0 deletions examples/request-reply/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import (
"ella.to/bus"
)

// NOTE:
// make sure to run provide namepaces "math" before running the server
// for example
// go run cmd/bus/main.go server --namespaces math

type Request struct {
A int
B int
Expand Down
2 changes: 1 addition & 1 deletion gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

func newInboxSubject() string {
return newId("i.")
return newId("_bus_.")
}

func newEventId() string {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module ella.to/bus
go 1.23.1

require (
ella.to/immuta v0.0.3
ella.to/immuta v0.0.4
ella.to/sse v0.0.6
ella.to/task v0.0.5
github.com/rs/xid v1.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
ella.to/immuta v0.0.3 h1:QCtTvF7Lr5c807KUvPBJVCZLfQt7Rb+LRE6n+XIyLe4=
ella.to/immuta v0.0.3/go.mod h1:0lKIf8Nu+irCQz5pX7lqrQaepMyfNM3zl4kRzK/oLNw=
ella.to/immuta v0.0.4 h1:tsOvWm0EJkqgnpR7aXU2GylFOe8bdAtKAJrCxyypsr0=
ella.to/immuta v0.0.4/go.mod h1:0lKIf8Nu+irCQz5pX7lqrQaepMyfNM3zl4kRzK/oLNw=
ella.to/solid v0.0.2 h1:RSxqe/HEbMZ/vcwwDD9xMQV643Q512IcS9IF5KHcp9w=
ella.to/solid v0.0.2/go.mod h1:0+qlRn4069za08wDegNAFUICaKAZCZHKUjav8UEZml0=
ella.to/sse v0.0.6 h1:3zJui8y5iOfojWku3QZMFKKSinRtAtKivD8u3TrtDU4=
Expand Down
39 changes: 35 additions & 4 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,15 @@ func (h *Handler) Put(w http.ResponseWriter, r *http.Request) {

var eventIndex int64

namespaceIdx := strings.Index(event.Subject, ".")

err := h.runner.Submit(ctx, func(ctx context.Context) error {
err := event.encode(&h.buffer)
if err != nil {
return err
}

eventIndex, _, err = h.eventsLog.Append(ctx, &h.buffer)
eventIndex, _, err = h.eventsLog.Append(ctx, event.Subject[:namespaceIdx], &h.buffer)
if err != nil {
return err
}
Expand Down Expand Up @@ -137,7 +139,9 @@ func (h *Handler) Get(w http.ResponseWriter, r *http.Request) {
startPos = 0
}

stream := h.eventsLog.Stream(ctx, startPos)
namespaceIdx := strings.Index(subject, ".")

stream := h.eventsLog.Stream(ctx, subject[:namespaceIdx], startPos)
defer stream.Done()

for {
Expand Down Expand Up @@ -283,8 +287,35 @@ func NewHandler(eventLogs *immuta.Storage, runner task.Runner) *Handler {
return h
}

func NewServer(addr string, eventLogs string) (*http.Server, error) {
eventStorage, err := immuta.New(eventLogs, 10, true)
func NewServer(addr string, logsDirPath string, namespaces []string) (*http.Server, error) {
{
// This block is used to validate the namespaces
// and make sure there is no reserved and duplicate namespaces

namespacesSet := make(map[string]struct{})
for _, ns := range namespaces {
namespacesSet[ns] = struct{}{}
}

// NOTE: currently bus has an internal namespace "_bus_" which was used to store
// RPC and Confirm events. This namespace should not be consumed by the user.
if _, ok := namespacesSet["_bus_"]; ok {
return nil, errors.New("namespace _bus_ is reserved")
}

namespaces = make([]string, 0, len(namespacesSet)+1)
namespaces = append(namespaces, "_bus_")
for ns := range namespacesSet {
namespaces = append(namespaces, ns)
}
}

eventStorage, err := immuta.New(
immuta.WithLogsDirPath(logsDirPath),
immuta.WithReaderCount(5),
immuta.WithFastWrite(true),
immuta.WithNamespaces(namespaces...),
)
if err != nil {
return nil, err
}
Expand Down
10 changes: 8 additions & 2 deletions matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,14 @@ func ValidateSubject(subject string) error {
}

for i, c := range subject {
if i == 0 && c == '.' {
return errors.New("subject should not starts with .")
if i == 0 {
if c == '.' {
return errors.New("subject should not starts with .")
} else if c == '*' {
return errors.New("subject should not starts with *")
} else if c == '>' {
return errors.New("subject should not starts with >")
}
}

if i == len(subject)-1 && c == '.' {
Expand Down
42 changes: 42 additions & 0 deletions matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,45 @@ func BenchmarkMatchSubject(b *testing.B) {
bus.MatchSubject("a.b.c", "a.*.c")
}
}

func TestValidateSubject(t *testing.T) {
tests := []struct {
name string
subject string
wantErr bool
errMsg string
}{
// Valid subjects
{"Valid simple subject", "a.b.c", false, ""},
{"Valid subject with *", "a.b.*", false, ""},
{"Valid subject with >", "a.b.>", false, ""},

// Invalid subjects
{"Should always start with alphanumeric 1", ".a.b", true, "subject should not starts with ."},
{"Should always start with alphanumeric 2", "*", true, "subject should not starts with *"},
{"Should always start with alphanumeric 3", ">", true, "subject should not starts with >"},
{"Empty subject", "", true, "subject is empty"},
{"Starts with .", ".a.b", true, "subject should not starts with ."},
{"Ends with .", "a.b.", true, "subject should not ends with ."},
{"Contains spaces", "a. b.c", true, "subject should not have spaces"},
{"Series of dots", "a..b.c", true, "subject should not have series of dots one after another"},
{"Invalid character", "a.b.c$", true, "subject should have only consists of alphanumerics, dots, *, > and _"},
{"Middle >", "a.b>.c", true, "subject should not have anything after >"},
{"No dot before >", "a.b>", true, "subject should have a dot before >"},
{"No dot before *", "a.b*", true, "subject should have a dot before *"},
{"No dot after *", "a.*b", true, "subject should have a dot after *"},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := bus.ValidateSubject(tt.subject)
if (err != nil) != tt.wantErr {
t.Errorf("expected error: %v, got: %v", tt.wantErr, err)
}

if tt.wantErr && err.Error() != tt.errMsg {
t.Errorf("expected error message: %v, got: %v", tt.errMsg, err.Error())
}
})
}
}

0 comments on commit 33e8923

Please sign in to comment.