-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcarbo_test.go
110 lines (101 loc) · 2.4 KB
/
carbo_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package carbo_test
import (
"context"
"fmt"
"log"
"github.com/hiroara/carbo/flow"
"github.com/hiroara/carbo/pipe"
"github.com/hiroara/carbo/registry"
"github.com/hiroara/carbo/sink"
"github.com/hiroara/carbo/source"
"github.com/hiroara/carbo/task"
)
type MyConfig struct {
StringField string `yaml:"string_field"`
IntField int `yaml:"int_field"`
}
// Build a flow and directly run it.
func Example_flow() {
ss := source.FromSlice([]string{"a", "b", "c"})
ds := task.Connect(
ss.AsTask(),
pipe.Map(func(ctx context.Context, s string) (string, error) {
return s + s, nil
}).AsTask(),
1,
)
pr := task.Connect(
ds,
sink.ElementWise(func(ctx context.Context, s string) error {
fmt.Println(s)
return nil
}).AsTask(),
1,
)
err := flow.FromTask(pr).Run(context.Background())
if err != nil {
log.Fatal(err)
}
// Output:
// aa
// bb
// cc
}
// Define a flow factory function to build a flow with a config struct, and run the flow.
func Example_flowFactory() {
fac := func(cfg *MyConfig) (*flow.Flow, error) {
ss := source.FromSlice([]string{cfg.StringField})
pr := task.Connect(
ss.AsTask(),
sink.ElementWise(func(ctx context.Context, s string) error {
fmt.Println(s)
return nil
}).AsTask(),
1,
)
return flow.FromTask(pr), nil
}
err := flow.RunWithConfig(context.Background(), fac, "testdata/config.yaml")
if err != nil {
log.Fatal(err)
}
// Output:
// value-from-string-field
}
// Define multiple flow factories, register them to a registry, and run a flow.
// This is useful to make an executable that takes a subcommand.
func Example_registry() {
fac1 := func() (*flow.Flow, error) {
ss := source.FromSlice([]string{"item1"})
pr := task.Connect(
ss.AsTask(),
sink.ElementWise(func(ctx context.Context, s string) error {
fmt.Println(s)
return nil
}).AsTask(),
1,
)
return flow.FromTask(pr), nil
}
fac2 := func(cfg *MyConfig) (*flow.Flow, error) {
ss := source.FromSlice([]int{cfg.IntField})
pr := task.Connect(
ss.AsTask(),
sink.ElementWise(func(ctx context.Context, i int) error {
fmt.Println(i)
return nil
}).AsTask(),
1,
)
return flow.FromTask(pr), nil
}
r := registry.New()
r.Register("flow1", flow.NewFactory(fac1))
r.Register("flow2", flow.NewFactoryWithConfig(fac2, "testdata/config.yaml"))
err := r.Run(context.Background(), "flow2")
if err != nil {
log.Fatal(err)
}
// Output:
// 100
}