Skip to content

Commit

Permalink
Add Generate() function for ergonomic stream creation (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
destel authored Nov 24, 2024
1 parent 514ea1c commit 3d0ba14
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 95 deletions.
76 changes: 36 additions & 40 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ Pipelines typically consist of a sequence of non-blocking channel transformation
The general rule is: any error occurring anywhere in a pipeline is propagated down to the final stage,
where it's caught by some blocking function and returned to the caller.

Rill provides a wide selection of blocking functions. Some of them are:
Rill provides a wide selection of blocking functions. Here are some commonly used ones:

- **ForEach:** Concurrently applies a user function to each item in the stream.
[Example](https://pkg.go.dev/github.com/destel/rill#example-ForEach)
Expand All @@ -233,6 +233,7 @@ all goroutines feeding the stream are allowed to complete.
Rill is context-agnostic, meaning that it does not enforce any specific context usage.
However, it's recommended to make user-defined pipeline stages context-aware.
This is especially important for the initial stage, as it allows to stop feeding the pipeline with new items after the context cancellation.
In practice the first stage is often naturally context-aware through Go's standard APIs for databases, HTTP clients, and other external sources.

In the example below the `CheckAllUsersExist` function uses several concurrent workers to check if all users
from the given list exist. When an error occurs (like a non-existent user), the function returns that error
Expand All @@ -244,7 +245,7 @@ func main() {
ctx := context.Background()

// ID 999 doesn't exist, so fetching will stop after hitting it.
err := CheckAllUsersExist(ctx, 3, []int{1, 2, 3, 4, 5, 999, 7, 8, 9, 10})
err := CheckAllUsersExist(ctx, 3, []int{1, 2, 3, 4, 5, 999, 7, 8, 9, 10, 11, 12, 13, 14, 15})
fmt.Printf("Check result: %v\n", err)
}

Expand Down Expand Up @@ -273,6 +274,22 @@ func CheckAllUsersExist(ctx context.Context, concurrency int, ids []int) error {
}
```

In the example above only the second stage (`mockapi.GetUser`) of the pipeline is context-aware.
**FromSlice** works well here since the input is small, iteration is fast and context cancellation prevents expensive API calls regardless.
The following code demonstrates how to replace **FromSlice** with **Generate** when full context awareness becomes important.

```go
idsStream := rill.Generate(func(send func(int), sendErr func(error)) {
for _, id := range ids {
if ctx.Err() != nil {
return
}
send(id)
}
})
```



## Order Preservation (Ordered Fan-In)
Concurrent processing can boost performance, but since tasks take different amounts of time to complete,
Expand All @@ -299,12 +316,12 @@ func main() {
// The string to search for in the downloaded files
needle := []byte("26")

// Start with a stream of numbers from 0 to 999
fileIDs := streamNumbers(ctx, 0, 1000)

// Generate a stream of URLs from http://example.com/file-0.txt to http://example.com/file-999.txt
urls := rill.OrderedMap(fileIDs, 1, func(id int) (string, error) {
return fmt.Sprintf("https://example.com/file-%d.txt", id), nil
// Generate a stream of URLs from https://example.com/file-0.txt
// to https://example.com/file-999.txt
urls := rill.Generate(func(send func(string), sendErr func(error)) {
for i := 0; i < 1000 && ctx.Err() == nil; i++ {
send(fmt.Sprintf("https://example.com/file-%d.txt", i))
}
})

// Download and process the files
Expand Down Expand Up @@ -335,22 +352,6 @@ func main() {
fmt.Println("Not found")
}
}

// helper function that creates a stream of numbers [start, end) and respects the context
func streamNumbers(ctx context.Context, start, end int) <-chan rill.Try[int] {
out := make(chan rill.Try[int])
go func() {
defer close(out)
for i := start; i < end; i++ {
select {
case <-ctx.Done():
return
case out <- rill.Try[int]{Value: i}:
}
}
}()
return out
}
```


Expand Down Expand Up @@ -389,24 +390,21 @@ func main() {
}

// StreamUsers is a reusable streaming wrapper around the mockapi.ListUsers function.
// It iterates through all listing pages and returns a stream of users.
// It iterates through all listing pages and uses [Generate] to simplify sending users and errors to the resulting stream.
// This function is useful both on its own and as part of larger pipelines.
func StreamUsers(ctx context.Context, query *mockapi.UserQuery) <-chan rill.Try[*mockapi.User] {
res := make(chan rill.Try[*mockapi.User])

if query == nil {
query = &mockapi.UserQuery{}
}

go func() {
defer close(res)
return rill.Generate(func(send func(*mockapi.User), sendErr func(error)) {
var currentQuery mockapi.UserQuery
if query != nil {
currentQuery = *query
}

for page := 0; ; page++ {
query.Page = page
currentQuery.Page = page

users, err := mockapi.ListUsers(ctx, query)
users, err := mockapi.ListUsers(ctx, &currentQuery)
if err != nil {
res <- rill.Wrap[*mockapi.User](nil, err)
sendErr(err)
return
}

Expand All @@ -415,12 +413,10 @@ func StreamUsers(ctx context.Context, query *mockapi.UserQuery) <-chan rill.Try[
}

for _, user := range users {
res <- rill.Wrap(user, nil)
send(user)
}
}
}()

return res
})
}
```

Expand Down
94 changes: 54 additions & 40 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,12 @@ func Example_ordering() {
// The string to search for in the downloaded files
needle := []byte("26")

// Start with a stream of numbers from 0 to 999
fileIDs := streamNumbers(ctx, 0, 1000)

// Generate a stream of URLs from http://example.com/file-0.txt to http://example.com/file-999.txt
urls := rill.OrderedMap(fileIDs, 1, func(id int) (string, error) {
return fmt.Sprintf("https://example.com/file-%d.txt", id), nil
// Generate a stream of URLs from https://example.com/file-0.txt
// to https://example.com/file-999.txt
urls := rill.Generate(func(send func(string), sendErr func(error)) {
for i := 0; i < 1000 && ctx.Err() == nil; i++ {
send(fmt.Sprintf("https://example.com/file-%d.txt", i))
}
})

// Download and process the files
Expand Down Expand Up @@ -267,24 +267,21 @@ func Example_flatMap() {
}

// StreamUsers is a reusable streaming wrapper around the mockapi.ListUsers function.
// It iterates through all listing pages and returns a stream of users.
// It iterates through all listing pages and uses [Generate] to simplify sending users and errors to the resulting stream.
// This function is useful both on its own and as part of larger pipelines.
func StreamUsers(ctx context.Context, query *mockapi.UserQuery) <-chan rill.Try[*mockapi.User] {
res := make(chan rill.Try[*mockapi.User])

if query == nil {
query = &mockapi.UserQuery{}
}

go func() {
defer close(res)
return rill.Generate(func(send func(*mockapi.User), sendErr func(error)) {
var currentQuery mockapi.UserQuery
if query != nil {
currentQuery = *query
}

for page := 0; ; page++ {
query.Page = page
currentQuery.Page = page

users, err := mockapi.ListUsers(ctx, query)
users, err := mockapi.ListUsers(ctx, &currentQuery)
if err != nil {
res <- rill.Wrap[*mockapi.User](nil, err)
sendErr(err)
return
}

Expand All @@ -293,12 +290,10 @@ func StreamUsers(ctx context.Context, query *mockapi.UserQuery) <-chan rill.Try[
}

for _, user := range users {
res <- rill.Wrap(user, nil)
send(user)
}
}
}()

return res
})
}

// This example demonstrates how to gracefully stop a pipeline on the first error.
Expand All @@ -308,7 +303,7 @@ func Example_context() {
ctx := context.Background()

// ID 999 doesn't exist, so fetching will stop after hitting it.
err := CheckAllUsersExist(ctx, 3, []int{1, 2, 3, 4, 5, 999, 7, 8, 9, 10})
err := CheckAllUsersExist(ctx, 3, []int{1, 2, 3, 4, 5, 999, 7, 8, 9, 10, 11, 12, 13, 14, 15})
fmt.Printf("Check result: %v\n", err)
}

Expand All @@ -319,7 +314,15 @@ func CheckAllUsersExist(ctx context.Context, concurrency int, ids []int) error {
defer cancel()

// Convert the slice into a stream
idsStream := rill.FromSlice(ids, nil)
// Use Generate instead of FromSlice to make the first stage context-aware
idsStream := rill.Generate(func(send func(int), sendErr func(error)) {
for _, id := range ids {
if ctx.Err() != nil {
return
}
send(id)
}
})

// Fetch users concurrently.
users := rill.Map(idsStream, concurrency, func(id int) (*mockapi.User, error) {
Expand Down Expand Up @@ -615,6 +618,33 @@ func ExampleForEach_ordered() {
fmt.Println("Error:", err)
}

// Generate a stream of URLs from https://example.com/file-0.txt to https://example.com/file-9.txt
func ExampleGenerate() {
urls := rill.Generate(func(send func(string), sendErr func(error)) {
for i := 0; i < 10; i++ {
send(fmt.Sprintf("https://example.com/file-%d.txt", i))
}
})

printStream(urls)
}

// Generate an infinite stream of natural numbers (1, 2, 3, ...).
// New numbers are sent to the stream every 500ms until the context is canceled
func ExampleGenerate_context() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

numbers := rill.Generate(func(send func(int), sendErr func(error)) {
for i := 1; ctx.Err() == nil; i++ {
send(i)
time.Sleep(500 * time.Millisecond)
}
})

printStream(numbers)
}

func ExampleMap() {
// Convert a slice of numbers into a stream
numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)
Expand Down Expand Up @@ -746,22 +776,6 @@ func square(x int) int {
return x * x
}

// helper function that creates a stream of numbers [start, end) and respects the context
func streamNumbers(ctx context.Context, start, end int) <-chan rill.Try[int] {
out := make(chan rill.Try[int])
go func() {
defer close(out)
for i := start; i < end; i++ {
select {
case <-ctx.Done():
return
case out <- rill.Try[int]{Value: i}:
}
}
}()
return out
}

// printStream prints all items from a stream (one per line) and an error if any.
func printStream[A any](stream <-chan rill.Try[A]) {
fmt.Println("Result:")
Expand Down
Loading

0 comments on commit 3d0ba14

Please sign in to comment.