Skip to content

Commit

Permalink
Refactor with task.Feed
Browse files Browse the repository at this point in the history
To avoid getting stuck when an error occurs in a downstream
  • Loading branch information
hiroara committed Jul 26, 2023
1 parent 3ed9f01 commit 0b1f13f
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 7 deletions.
3 changes: 1 addition & 2 deletions pipe/fanout.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ func FanoutWithMap[S, I, T any](mapFn FanoutMapFn[I, T]) *FanoutOp[S, I, T] {
if err != nil {
return err
}
out <- o
return nil
return task.Feed(ctx, out, o)
}
return &FanoutOp[S, I, T]{aggregate: aggFn}
}
Expand Down
4 changes: 3 additions & 1 deletion pipe/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func (op *MapOp[S, T]) pipeFn() PipeFn[S, T] {
if err != nil {
return err
}
out <- mapped
if err := task.Feed(ctx, out, mapped); err != nil {
return err
}
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pipe/map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestMap(t *testing.T) {
out := make([]string, 0)
sin := sink.ToSlice(&out)

mapped := task.Connect(src.AsTask(), mappingTask, 2)
mapped := task.Connect(src.AsTask(), mappingTask, 0)
toSlice := task.Connect(mapped, sin.AsTask(), 2)

err := flow.FromTask(toSlice).Run(context.Background())
Expand Down
6 changes: 4 additions & 2 deletions source/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ func (op *PullOp[T]) AsSource() Source[T] {
for _, msg := range resp.Messages {
el, err := op.marshalSpec.Unmarshal(msg.Value)
if err != nil {
return nil
return err
}
if err := task.Feed(ctx, out, el); err != nil {
return err
}
out <- el
}
}
return nil
Expand Down
4 changes: 3 additions & 1 deletion source/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ func FromSlice[T any](items []T) *SliceSourceOp[T] {
return &SliceSourceOp[T]{
run: func(ctx context.Context, out chan<- T) error {
for _, item := range items {
out <- item
if err := task.Feed(ctx, out, item); err != nil {
return err
}
}
return nil
},
Expand Down
12 changes: 12 additions & 0 deletions task/feed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package task

import "context"

func Feed[T any](ctx context.Context, out chan<- T, el T) error {
select {
case out <- el:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

0 comments on commit 0b1f13f

Please sign in to comment.