Skip to content

Commit

Permalink
feat: add *MapOp.ConcurrentPreservingOrder
Browse files Browse the repository at this point in the history
  • Loading branch information
hiroara committed Oct 1, 2023
1 parent 66bf3a8 commit 3f0532c
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 16 deletions.
6 changes: 6 additions & 0 deletions pipe/concurrency.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@ type concurrency[S, T any] struct {
}

// Create a concurrent Pipe from multiple operators that have the same behavior.
//
// Each input is processed whenever it is possible.
// For this reason, the concurrent Pipe doesn't preserve the order.
func (op *concurrency[S, T]) Concurrent(concurrency int, opts ...task.Option) Pipe[S, T] {
return ConcurrentFromFn(op.run, concurrency, opts...)
}

// Create a Pipe from multiple Pipes.
// The passed Pipes will run concurrently, and those outputs will be merged as outputs of the created Pipe.
//
// Each input is processed whenever it is possible.
// For this reason, the concurrent Pipe doesn't preserve the order.
func Concurrent[S, T any](ps []Pipe[S, T], opts ...task.Option) Pipe[S, T] {
if len(ps) == 0 {
panic("at least 1 concurrent pipe is required")
Expand Down
82 changes: 82 additions & 0 deletions pipe/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package pipe

import (
"context"
"fmt"

"golang.org/x/sync/errgroup"

"github.com/hiroara/carbo/cache"
"github.com/hiroara/carbo/task"
Expand Down Expand Up @@ -45,3 +48,82 @@ func MapWithCache[S, T, K, V any](fn MapFn[S, T], sp cache.Spec[S, T, K, V]) *Ma
return cache.Run(ctx, sp, el, cache.CacheableFn[S, T](fn))
})
}

// Create a concurrent Pipe to apply the map operator.
//
// Unlike a Pipe created with Concurrent, a concurrent Pipe created with this ConcurrentPreservingOrder,
// preserves the order of elements.
func (op *MapOp[S, T]) ConcurrentPreservingOrder(concurrency int, opts ...task.Option) Pipe[S, T] {
if concurrency <= 0 {
panic("at least 1 concurrency is required")
}

return FromFn(func(ctx context.Context, in <-chan S, out chan<- T) error {
grp, ctx := errgroup.WithContext(ctx)
ins, outs, agg := duplicateOutChanPreservingOrder(in, out, concurrency)
for idx := 0; idx < concurrency; idx++ {
i := ins[idx]
o := outs[idx]
grp.Go(func() error {
defer close(o)
return op.run(ctx, i, o)
})
}
grp.Go(func() error { return agg(ctx) })
return grp.Wait()
}, opts...)
}

func duplicateOutChanPreservingOrder[S, T any](
in <-chan S, out chan<- T, n int,
) ([]<-chan S, []chan<- T, func(context.Context) error) {
if n <= 0 {
panic(fmt.Sprintf("argument n must be a positive value but received %d", n))
}

ins := make([]chan S, n)
outs := make([]chan T, n)
insRet := make([]<-chan S, n)
outsRet := make([]chan<- T, n)
for idx := 0; idx < n; idx++ {
i := make(chan S)
o := make(chan T)
ins[idx] = i
insRet[idx] = i
outs[idx] = o
outsRet[idx] = o
}
return insRet, outsRet, func(ctx context.Context) error {
grp, ctx := errgroup.WithContext(ctx)

grp.Go(func() error {
defer func() {
for _, i := range ins {
close(i)
}
}()
idx := 0
for el := range in {
ins[idx] <- el
idx = (idx + 1) % n
}
return nil
})

grp.Go(func() error {
for {
for _, o := range outs {
el, ok := <-o
if !ok {
return nil
}
if err := task.Emit(ctx, out, el); err != nil {
return err
}
}
}
})

return grp.Wait()
}
}
37 changes: 21 additions & 16 deletions pipe/map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,36 +10,24 @@ import (

"github.com/hiroara/carbo/cache"
"github.com/hiroara/carbo/cache/store"
"github.com/hiroara/carbo/flow"
"github.com/hiroara/carbo/pipe"
"github.com/hiroara/carbo/sink"
"github.com/hiroara/carbo/source"
"github.com/hiroara/carbo/task"
"github.com/hiroara/carbo/taskfn"
)

func TestMap(t *testing.T) {
t.Parallel()

src := source.FromSlice([]string{"item1", "item2", "item2"})
els := []string{"item1", "item2", "item2"}

fn := func(ctx context.Context, s string) (string, error) {
return s + s, nil
}
m := pipe.Map(fn)

runFlowWithMap := func(mappingTask task.Task[string, string]) ([]string, error) {
out := make([]string, 0)
sin := sink.ToSlice(&out)

mapped := task.Connect(src.AsTask(), mappingTask, 0)
toSlice := task.Connect(mapped, sin.AsTask(), 2)

err := flow.FromTask(toSlice).Run(context.Background())
if err != nil {
return nil, err
}

return out, nil
tfn := taskfn.SliceToSlice(mappingTask)
return tfn(context.Background(), els)
}

t.Run("ErrorCase", func(t *testing.T) {
Expand Down Expand Up @@ -70,6 +58,23 @@ func TestMap(t *testing.T) {
assert.ElementsMatch(t, []string{"item1item1", "item2item2", "item2item2"}, out)
})

t.Run("ConcurrentPreservingOrder", func(t *testing.T) {
t.Parallel()

m := pipe.Map(fn)
tfn := taskfn.SliceToSlice(m.ConcurrentPreservingOrder(2).AsTask())
els := []string{"item1", "item2", "item3", "item4", "item5", "item6", "item7", "item8", "item9", "item10"}
out, err := tfn(context.Background(), els)
require.NoError(t, err)

expected := make([]string, len(els))
for i, el := range els {
expected[i] = el + el
}

assert.Equal(t, expected, out)
})

t.Run("Cache", func(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 3f0532c

Please sign in to comment.