Skip to content

Commit

Permalink
fix: panic when running invalid concurrent pipe/sink
Browse files Browse the repository at this point in the history
  • Loading branch information
hiroara committed Aug 9, 2023
1 parent 6d9f15f commit 8cdf54b
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 36 deletions.
5 changes: 5 additions & 0 deletions internal/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@ package channel

import (
"context"
"fmt"
"reflect"

"github.com/hiroara/carbo/task"
)

func DuplicateOutChan[T any](out chan<- T, n int) ([]chan<- T, func(context.Context) error) {
if n <= 0 {
panic(fmt.Sprintf("argument n must be a positive value but received %d", n))
}

outs := make([]chan<- T, n)
cases := make([]reflect.SelectCase, n)
for i := range outs {
Expand Down
11 changes: 11 additions & 0 deletions internal/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,15 @@ func TestDuplicateOutChan(t *testing.T) {

require.Error(t, grp.Wait(), context.Canceled)
})

t.Run("ArgN=Zero", func(t *testing.T) {
t.Parallel()

out := make(chan string, 2)
assert.PanicsWithValue(
t,
"argument n must be a positive value but received 0",
func() { channel.DuplicateOutChan(out, 0) },
)
})
}
7 changes: 7 additions & 0 deletions pipe/concurrency.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ func (op *concurrency[S, T]) Concurrent(concurrency int, opts ...task.Option) Pi
// Create a Pipe from multiple Pipes.
// The passed Pipes will run concurrently, and those outputs will be merged as outputs of the created Pipe.
func Concurrent[S, T any](ps []Pipe[S, T], opts ...task.Option) Pipe[S, T] {
if len(ps) == 0 {
panic("at least 1 concurrent pipe is required")
}

return FromFn(func(ctx context.Context, in <-chan S, out chan<- T) error {
grp, ctx := errgroup.WithContext(ctx)
outs, agg := channel.DuplicateOutChan(out, len(ps))
Expand All @@ -39,6 +43,9 @@ func Concurrent[S, T any](ps []Pipe[S, T], opts ...task.Option) Pipe[S, T] {
// Create a Pipe to run the provided PipeFn concurrently.
// This is a shorthand to create a concurrent Pipe from Pipes with the same function.
func ConcurrentFromFn[S, T any](fn PipeFn[S, T], concurrency int, opts ...task.Option) Pipe[S, T] {
if concurrency < 0 {
concurrency = 0
}
ps := make([]Pipe[S, T], concurrency)
for i := range ps {
ps[i] = FromFn(fn, opts...)
Expand Down
78 changes: 57 additions & 21 deletions pipe/concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,27 @@ import (
"github.com/hiroara/carbo/pipe"
)

func TestConcurrentPipe(t *testing.T) {
t.Parallel()
func assertConcurrentPipe(t *testing.T, p pipe.Pipe[string, string]) {
in := make(chan string, 2)
out := make(chan string, 2)
in <- "item1"
in <- "item2"
close(in)

err := p.Run(context.Background(), in, out)
require.NoError(t, err)

assertConcurrentPipe := func(p pipe.Pipe[string, string]) {
in := make(chan string, 2)
out := make(chan string, 2)
in <- "item1"
in <- "item2"
close(in)

err := p.Run(context.Background(), in, out)
require.NoError(t, err)

outputs := make([]string, 0)
for item := range out {
outputs = append(outputs, item)
}
assert.ElementsMatch(t, []string{"item1item1", "item2item2"}, outputs)
outputs := make([]string, 0)
for item := range out {
outputs = append(outputs, item)
}
assert.ElementsMatch(t, []string{"item1item1", "item2item2"}, outputs)
}

func TestConcurrent(t *testing.T) {
t.Parallel()

t.Run("Concurrent", func(t *testing.T) {
t.Run("NormalCase", func(t *testing.T) {
t.Parallel()

pipeFn1, called1 := createPipeFn(double)
Expand All @@ -41,22 +41,58 @@ func TestConcurrentPipe(t *testing.T) {
pipe.FromFn(pipeFn2),
})

assertConcurrentPipe(p)
assertConcurrentPipe(t, p)
close(called1)
close(called2)

assert.Len(t, testutils.ReadItems(called1), 1)
assert.Len(t, testutils.ReadItems(called2), 1)
})

t.Run("ConcurrentFromFn", func(t *testing.T) {
t.Run("NoConcurrentPipesCase", func(t *testing.T) {
t.Parallel()

assert.PanicsWithValue(
t,
"at least 1 concurrent pipe is required",
func() { pipe.Concurrent([]pipe.Pipe[string, string]{}) },
)
})
}

func TestConcurrentFromFn(t *testing.T) {
t.Run("NormalCase", func(t *testing.T) {
t.Parallel()

fn, called := createPipeFn(double)
p := pipe.ConcurrentFromFn(fn, 2)

assertConcurrentPipe(p)
assertConcurrentPipe(t, p)
close(called)
assert.Len(t, testutils.ReadItems(called), 2)
})

t.Run("ZeroConcurrencyCase", func(t *testing.T) {
t.Parallel()

fn, _ := createPipeFn(double)

assert.PanicsWithValue(
t,
"at least 1 concurrent pipe is required",
func() { pipe.ConcurrentFromFn(fn, 0) },
)
})

t.Run("NegativeConcurrencyCase", func(t *testing.T) {
t.Parallel()

fn, _ := createPipeFn(double)

assert.PanicsWithValue(
t,
"at least 1 concurrent pipe is required",
func() { pipe.ConcurrentFromFn(fn, -1) },
)
})
}
7 changes: 7 additions & 0 deletions sink/concurrency.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
// Create a Sink from multiple Sinks.
// The passed Sinks will run concurrently.
func Concurrent[S any](ss []Sink[S], opts ...task.Option) Sink[S] {
if len(ss) == 0 {
panic("at least 1 concurrent sink is required")
}

return FromFn(func(ctx context.Context, in <-chan S) error {
grp, ctx := errgroup.WithContext(ctx)
for _, s := range ss {
Expand All @@ -26,6 +30,9 @@ func Concurrent[S any](ss []Sink[S], opts ...task.Option) Sink[S] {
// Create a Sink to run the provided SinkFn concurrently.
// This is a shorthand to create a concurrent Sink from Sinks with the same function.
func ConcurrentFromFn[S any](fn SinkFn[S], concurrency int, opts ...task.Option) Sink[S] {
if concurrency < 0 {
concurrency = 0
}
ss := make([]Sink[S], concurrency)
for i := range ss {
ss[i] = FromFn(fn, opts...)
Expand Down
66 changes: 51 additions & 15 deletions sink/concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,22 @@ import (
"github.com/hiroara/carbo/sink"
)

func TestConcurrentSink(t *testing.T) {
t.Parallel()

runSink := func(s sink.Sink[string]) error {
in := make(chan string, 4)
out := make(chan struct{})
in <- "item1"
in <- "item2"
in <- "item3"
in <- "item4"
close(in)
func runSink(s sink.Sink[string]) error {
in := make(chan string, 4)
out := make(chan struct{})
in <- "item1"
in <- "item2"
in <- "item3"
in <- "item4"
close(in)

return s.Run(context.Background(), in, out)
}

return s.Run(context.Background(), in, out)
}
func TestConcurrent(t *testing.T) {
t.Parallel()

t.Run("Concurrent", func(t *testing.T) {
t.Run("NormalCase", func(t *testing.T) {
t.Parallel()

sinkFn1, items1, called1 := createArraySink()
Expand All @@ -50,7 +50,19 @@ func TestConcurrentSink(t *testing.T) {
assert.ElementsMatch(t, []string{"item1", "item2", "item3", "item4"}, items)
})

t.Run("ConcurrentFromFn", func(t *testing.T) {
t.Run("NoConcurrentSinksCase", func(t *testing.T) {
t.Parallel()

assert.PanicsWithValue(
t,
"at least 1 concurrent sink is required",
func() { sink.Concurrent([]sink.Sink[string]{}) },
)
})
}

func TestConcurrentFromFn(t *testing.T) {
t.Run("NormalCase", func(t *testing.T) {
t.Parallel()

sinkFn, items, called := createArraySink()
Expand All @@ -65,4 +77,28 @@ func TestConcurrentSink(t *testing.T) {
assert.Len(t, testutils.ReadItems(called), 2)
assert.ElementsMatch(t, []string{"item1", "item2", "item3", "item4"}, testutils.ReadItems(items))
})

t.Run("ZeroConcurrencyCase", func(t *testing.T) {
t.Parallel()

fn, _, _ := createArraySink()

assert.PanicsWithValue(
t,
"at least 1 concurrent sink is required",
func() { sink.ConcurrentFromFn(fn, 0) },
)
})

t.Run("NegativeConcurrencyCase", func(t *testing.T) {
t.Parallel()

fn, _, _ := createArraySink()

assert.PanicsWithValue(
t,
"at least 1 concurrent sink is required",
func() { sink.ConcurrentFromFn(fn, -1) },
)
})
}

0 comments on commit 8cdf54b

Please sign in to comment.