Skip to content

01 KisFlow QuickStart With Config

刘丹冰 edited this page Apr 16, 2024 · 2 revisions

Case Source Code: https://github.com/aceld/kis-flow-usage/tree/main/2-quick_start_with_config

Project

├── Makefile
├── conf
│   ├── flow-CalStuAvgScore.yml
│   ├── func-AvgStuScore.yml
│   └── func-PrintStuAvgScore.yml
├── faas_stu_score_avg.go
├── faas_stu_score_avg_print.go
└── main.go

Flow

Config

(1) Flow Config

conf/flow-CalStuAvgScore.yml

kistype: flow
status: 1
flow_name: CalStuAvgScore
flows:
    - fname: AvgStuScore
    - fname: PrintStuAvgScore

(2) Function1 Config

conf/func-AvgStuScore.yml

kistype: func
fname: AvgStuScore
fmode: Calculate
source:
    name: StudentScore
    must:
        - stu_id

(3) Function2(Slink) Config

conf/func-PrintStuAvgScore.yml

kistype: func
fname: PrintStuAvgScore
fmode: Expand
source:
    name: StudentScore
    must:
        - stu_id

Main

main.go

package main

import (
	"context"
	"fmt"
	"github.com/aceld/kis-flow/file"
	"github.com/aceld/kis-flow/kis"
)

func main() {
	ctx := context.Background()

	// Load Configuration from file
	if err := file.ConfigImportYaml("conf/"); err != nil {
		panic(err)
	}

	// Get the flow
	flow1 := kis.Pool().GetFlow("CalStuAvgScore")
	if flow1 == nil {
		panic("flow1 is nil")
	}

	// Submit a string
	_ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
	// Submit a string
	_ = flow1.CommitRow(`{"stu_id":102, "score_1":100, "score_2":70, "score_3":60}`)

	// Run the flow
	if err := flow1.Run(ctx); err != nil {
		fmt.Println("err: ", err)
	}

	return
}

Function1

faas_stu_score_avg.go

package main

import (
	"context"
	"github.com/aceld/kis-flow/kis"
	"github.com/aceld/kis-flow/serialize"
)

type AvgStuScoreIn struct {
	serialize.DefaultSerialize
	StuId  int `json:"stu_id"`
	Score1 int `json:"score_1"`
	Score2 int `json:"score_2"`
	Score3 int `json:"score_3"`
}

type AvgStuScoreOut struct {
	serialize.DefaultSerialize
	StuId    int     `json:"stu_id"`
	AvgScore float64 `json:"avg_score"`
}

// AvgStuScore(FaaS) calculates the average score of students
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
	for _, row := range rows {

		out := AvgStuScoreOut{
			StuId:    row.StuId,
			AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
		}

		// Submit the result data
		_ = flow.CommitRow(out)
	}

	return nil
}

Function2

faas_stu_score_avg_print.go

package main

import (
	"context"
	"fmt"
	"github.com/aceld/kis-flow/kis"
	"github.com/aceld/kis-flow/serialize"
)

type PrintStuAvgScoreIn struct {
	serialize.DefaultSerialize
	StuId    int     `json:"stu_id"`
	AvgScore float64 `json:"avg_score"`
}

type PrintStuAvgScoreOut struct {
	serialize.DefaultSerialize
}

func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {

	for _, row := range rows {
		fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
	}

	return nil
}

OutPut

Add KisPool FuncName=AvgStuScore
Add KisPool FuncName=PrintStuAvgScore
Add FlowRouter FlowName=CalStuAvgScore
stuid: [101], avg score: [90]
stuid: [102], avg score: [76.66666666666667]