diff --git a/pipe/concurrency.go b/pipe/concurrency.go index f09444f..caf851c 100644 --- a/pipe/concurrency.go +++ b/pipe/concurrency.go @@ -14,12 +14,18 @@ type concurrency[S, T any] struct { } // Create a concurrent Pipe from multiple operators that have the same behavior. +// +// Each input is processed whenever it is possible. +// For this reason, the concurrent Pipe doesn't preserve the order. func (op *concurrency[S, T]) Concurrent(concurrency int, opts ...task.Option) Pipe[S, T] { return ConcurrentFromFn(op.run, concurrency, opts...) } // Create a Pipe from multiple Pipes. // The passed Pipes will run concurrently, and those outputs will be merged as outputs of the created Pipe. +// +// Each input is processed whenever it is possible. +// For this reason, the concurrent Pipe doesn't preserve the order. func Concurrent[S, T any](ps []Pipe[S, T], opts ...task.Option) Pipe[S, T] { if len(ps) == 0 { panic("at least 1 concurrent pipe is required") diff --git a/pipe/map.go b/pipe/map.go index cd81f92..a81fb0e 100644 --- a/pipe/map.go +++ b/pipe/map.go @@ -2,6 +2,9 @@ package pipe import ( "context" + "fmt" + + "golang.org/x/sync/errgroup" "github.com/hiroara/carbo/cache" "github.com/hiroara/carbo/task" @@ -45,3 +48,82 @@ func MapWithCache[S, T, K, V any](fn MapFn[S, T], sp cache.Spec[S, T, K, V]) *Ma return cache.Run(ctx, sp, el, cache.CacheableFn[S, T](fn)) }) } + +// Create a concurrent Pipe to apply the map operator. +// +// Unlike a Pipe created with Concurrent, a concurrent Pipe created with this ConcurrentPreservingOrder, +// preserves the order of elements. +func (op *MapOp[S, T]) ConcurrentPreservingOrder(concurrency int, opts ...task.Option) Pipe[S, T] { + if concurrency <= 0 { + panic("at least 1 concurrency is required") + } + + return FromFn(func(ctx context.Context, in <-chan S, out chan<- T) error { + grp, ctx := errgroup.WithContext(ctx) + ins, outs, agg := duplicateOutChanPreservingOrder(in, out, concurrency) + for idx := 0; idx < concurrency; idx++ { + i := ins[idx] + o := outs[idx] + grp.Go(func() error { + defer close(o) + return op.run(ctx, i, o) + }) + } + grp.Go(func() error { return agg(ctx) }) + return grp.Wait() + }, opts...) +} + +func duplicateOutChanPreservingOrder[S, T any]( + in <-chan S, out chan<- T, n int, +) ([]<-chan S, []chan<- T, func(context.Context) error) { + if n <= 0 { + panic(fmt.Sprintf("argument n must be a positive value but received %d", n)) + } + + ins := make([]chan S, n) + outs := make([]chan T, n) + insRet := make([]<-chan S, n) + outsRet := make([]chan<- T, n) + for idx := 0; idx < n; idx++ { + i := make(chan S) + o := make(chan T) + ins[idx] = i + insRet[idx] = i + outs[idx] = o + outsRet[idx] = o + } + return insRet, outsRet, func(ctx context.Context) error { + grp, ctx := errgroup.WithContext(ctx) + + grp.Go(func() error { + defer func() { + for _, i := range ins { + close(i) + } + }() + idx := 0 + for el := range in { + ins[idx] <- el + idx = (idx + 1) % n + } + return nil + }) + + grp.Go(func() error { + for { + for _, o := range outs { + el, ok := <-o + if !ok { + return nil + } + if err := task.Emit(ctx, out, el); err != nil { + return err + } + } + } + }) + + return grp.Wait() + } +} diff --git a/pipe/map_test.go b/pipe/map_test.go index b05de24..6f17533 100644 --- a/pipe/map_test.go +++ b/pipe/map_test.go @@ -10,17 +10,15 @@ import ( "github.com/hiroara/carbo/cache" "github.com/hiroara/carbo/cache/store" - "github.com/hiroara/carbo/flow" "github.com/hiroara/carbo/pipe" - "github.com/hiroara/carbo/sink" - "github.com/hiroara/carbo/source" "github.com/hiroara/carbo/task" + "github.com/hiroara/carbo/taskfn" ) func TestMap(t *testing.T) { t.Parallel() - src := source.FromSlice([]string{"item1", "item2", "item2"}) + els := []string{"item1", "item2", "item2"} fn := func(ctx context.Context, s string) (string, error) { return s + s, nil @@ -28,18 +26,8 @@ func TestMap(t *testing.T) { m := pipe.Map(fn) runFlowWithMap := func(mappingTask task.Task[string, string]) ([]string, error) { - out := make([]string, 0) - sin := sink.ToSlice(&out) - - mapped := task.Connect(src.AsTask(), mappingTask, 0) - toSlice := task.Connect(mapped, sin.AsTask(), 2) - - err := flow.FromTask(toSlice).Run(context.Background()) - if err != nil { - return nil, err - } - - return out, nil + tfn := taskfn.SliceToSlice(mappingTask) + return tfn(context.Background(), els) } t.Run("ErrorCase", func(t *testing.T) { @@ -70,6 +58,23 @@ func TestMap(t *testing.T) { assert.ElementsMatch(t, []string{"item1item1", "item2item2", "item2item2"}, out) }) + t.Run("ConcurrentPreservingOrder", func(t *testing.T) { + t.Parallel() + + m := pipe.Map(fn) + tfn := taskfn.SliceToSlice(m.ConcurrentPreservingOrder(2).AsTask()) + els := []string{"item1", "item2", "item3", "item4", "item5", "item6", "item7", "item8", "item9", "item10"} + out, err := tfn(context.Background(), els) + require.NoError(t, err) + + expected := make([]string, len(els)) + for i, el := range els { + expected[i] = el + el + } + + assert.Equal(t, expected, out) + }) + t.Run("Cache", func(t *testing.T) { t.Parallel()