Skip to content

11 KisFlow Prometheus Metrics

刘丹冰 edited this page Apr 17, 2024 · 1 revision

Case Source Code https://github.com/aceld/kis-flow-usage/tree/main/10-metrics

11.1 Global Config

You can configure the Prometheus Metrics service for KisFlow through a global type configuration file as follows:

conf/kis-flow.yml

# kistype Global is the global configuration for KisFlow
kistype: global
# Whether to enable Prometheus monitoring
prometheus_enable: true
# Whether KisFlow needs to start a separate port for listening
prometheus_listen: true
# Prometheus scraping address
prometheus_serve: 0.0.0.0:20004
  • kistype: global indicates the global configuration for KisFlow.
  • prometheus_enable: Whether to enable Prometheus metrics collection. If set to true, metrics will be collected during KisFlow execution; otherwise, no statistics will be performed.
  • prometheus_listen: Whether to enable the Prometheus metrics HTTP service.
  • prometheus_serve: Address and port for the Prometheus metrics HTTP service.

11.2 Metrics

Currently, KisFlow supports the following types of Metrics:

const (
	METRICS_ROUTE string = "/metrics"

	LABEL_FLOW_NAME     string = "flow_name"
	LABEL_FLOW_ID       string = "flow_id"
	LABEL_FUNCTION_NAME string = "func_name"
	LABEL_FUNCTION_MODE string = "func_mode"

	COUNTER_KISFLOW_DATA_TOTAL_NAME string = "kisflow_data_total"
	COUNTER_KISFLOW_DATA_TOTAL_HELP string = "Total data volume for all KisFlow flows"

	GANGE_FLOW_DATA_TOTAL_NAME string = "flow_data_total"
	GANGE_FLOW_DATA_TOTAL_HELP string = "Total data quantity of data streams for each KisFlow FlowID"

	GANGE_FLOW_SCHE_CNTS_NAME string = "flow_schedule_cnts"
	GANGE_FLOW_SCHE_CNTS_HELP string = "Number of times each KisFlow FlowID has been scheduled"

	GANGE_FUNC_SCHE_CNTS_NAME string = "func_schedule_cnts"
	GANGE_FUNC_SCHE_CNTS_HELP string = "Number of times each KisFlow Function has been scheduled"

	HISTOGRAM_FUNCTION_DURATION_NAME string = "func_run_duration"
	HISTOGRAM_FUNCTION_DURATION_HELP string = "Duration of Function execution"

	HISTOGRAM_FLOW_DURATION_NAME string = "flow_run_duration"
	HISTOGRAM_FLOW_DURATION_HELP string = "Duration of Flow execution"
)

11.3 Starting a Flow

Here is an example that sends one data row every second to start Metrics collection and to compute the Metrics indicators:

main.go

package main

import (
	"context"
	"fmt"
	"github.com/aceld/kis-flow/file"
	"github.com/aceld/kis-flow/kis"
	"time"
)

func main() {
	ctx := context.Background()

	// Load Configuration from file
	if err := file.ConfigImportYaml("conf/"); err != nil {
		panic(err)
	}

	// Get the flow
	flow1 := kis.Pool().GetFlow("CalStuAvgScore")
	if flow1 == nil {
		panic("flow1 is nil")
	}

	stuId := 100

	for {
		// make 1 row data
		dataStr := fmt.Sprintf(`{"stu_id":%d, "score_1":100, "score_2":90, "score_3":80}`, stuId)

		// Submit a string
		_ = flow1.CommitRow(dataStr)

		// Run the flow
		if err := flow1.Run(ctx); err != nil {
			fmt.Println("err: ", err)
		}

		stuId++
		time.Sleep(1 * time.Second)
	}

	return
}

func init() {
	// Register functions
	kis.Pool().FaaS("VerifyStu", VerifyStu)
	kis.Pool().FaaS("AvgStuScore", AvgStuScore)
	kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}

11.4 Viewing the Prometheus Metrics Service

After running the above KisFlow application, open another terminal and enter the following command:

$ curl http://0.0.0.0:20004/metrics 

The output will be:

# HELP flow_data_total Total data quantity of data streams for each KisFlow FlowID
# TYPE flow_data_total gauge
flow_data_total{flow_name="CalStuAvgScore"} 89
# HELP flow_run_duration Duration of Flow execution
# TYPE flow_run_duration histogram
flow_run_duration_bucket{flow_name="CalStuAvgScore",le="0.005"} 0
flow_run_duration_bucket{flow_name="CalStuAvgScore",le="0.01"} 0
flow_run_duration_bucket{flow_name="CalStuAvgScore",le="0.03"} 0
flow_run_duration_bucket{flow_name="CalStuAvgScore",le="0.08"} 0
flow_run_duration_bucket{flow_name="CalStuAvgScore",le="0.1"} 0
flow_run_duration_bucket{flow_name="CalStuAvgScore",le="0.5"} 88
flow_run_duration_bucket{flow_name="CalStuAvgScore",le="1"} 89
flow_run_duration_bucket{flow_name="CalStuAvgScore",le="5"} 89
flow_run_duration_bucket{flow_name="CalStuAvgScore",le="10"} 89
flow_run_duration_bucket{flow_name="CalStuAvgScore",le="100"} 89
flow_run_duration_bucket{flow_name="CalStuAvgScore",le="1000"} 89
flow_run_duration_bucket{flow_name="CalStuAvgScore",le="5000"} 89
flow_run_duration_bucket{flow_name="CalStuAvgScore",le="30000"} 89
flow_run_duration_bucket{flow_name="CalStuAvgScore",le="60000"} 89
flow_run_duration_bucket{flow_name="CalStuAvgScore",le="+Inf"} 89
flow_run_duration_sum{flow_name="CalStuAvgScore"} 19.597656
flow_run_duration_count{flow_name="CalStuAvgScore"} 89
# HELP flow_schedule_cnts Number of times each KisFlow FlowID has been scheduled
# TYPE flow_schedule_cnts gauge
flow_schedule_cnts{flow_name="CalStuAvgScore"} 89
# HELP func_run_duration Duration of Function execution
# TYPE func_run_duration histogram
func_run_duration_bucket{func_mode="Calculate",func_name="AvgStuScore",le="0.005"} 0
func_run_duration_bucket{func_mode="Calculate",func_name="AvgStuScore",le="0.01"} 0
func_run_duration_bucket{func_mode="Calculate",func_name="AvgStuScore",le="0.03"} 1
func_run_duration_bucket{func_mode="Calculate",func_name="AvgStuScore",le="0.08"} 82
func_run_duration_bucket{func_mode="Calculate",func_name="AvgStuScore",le="0.1"} 83
func_run_duration_bucket{func_mode="Calculate",func_name="AvgStuScore",le="0.5"} 89
func_run_duration_bucket{func_mode="Calculate",func_name="AvgStuScore",le="1"} 89
func_run_duration_bucket{func_mode="Calculate",func_name="AvgStuScore",le="5"} 89
func_run_duration_bucket{func_mode="Calculate",func_name="AvgStuScore",le="10"} 89
func_run_duration_bucket{func_mode="Calculate",func_name="AvgStuScore",le="100"} 89
func_run_duration_bucket{func_mode="Calculate",func_name="AvgStuScore",le="1000"} 89
func_run_duration_bucket{func_mode="Calculate",func_name="AvgStuScore",le="5000"} 89
func_run_duration_bucket{func_mode="Calculate",func_name="AvgStuScore",le="30000"} 89
func_run_duration_bucket{func_mode="Calculate",func_name="AvgStuScore",le="+Inf"} 89
func_run_duration_sum{func_mode="Calculate",func_name="AvgStuScore"} 4.499301999999999
func_run_duration_count{func_mode="Calculate",func_name="AvgStuScore"} 89
func_run_duration_bucket{func_mode="Expand",func_name="PrintStuAvgScore",le="0.005"} 0
func_run_duration_bucket{func_mode="Expand",func_name="PrintStuAvgScore",le="0.01"} 0
func_run_duration_bucket{func_mode="Expand",func_name="PrintStuAvgScore",le="0.03"} 8
func_run_duration_bucket{func_mode="Expand",func_name="PrintStuAvgScore",le="0.08"} 83
func_run_duration_bucket{func_mode="Expand",func_name="PrintStuAvgScore",le="0.1"} 85
func_run_duration_bucket{func_mode="Expand",func_name="PrintStuAvgScore",le="0.5"} 89
func_run_duration_bucket{func_mode="Expand",func_name="PrintStuAvgScore",le="1"} 89
func_run_duration_bucket{func_mode="Expand",func_name="PrintStuAvgScore",le="5"} 89
func_run_duration_bucket{func_mode="Expand",func_name="PrintStuAvgScore",le="10"} 89
func_run_duration_bucket{func_mode="Expand",func_name="PrintStuAvgScore",le="100"} 89
func_run_duration_bucket{func_mode="Expand",func_name="PrintStuAvgScore",le="1000"} 89
func_run_duration_bucket{func_mode="Expand",func_name="PrintStuAvgScore",le="5000"} 89
func_run_duration_bucket{func_mode="Expand",func_name="PrintStuAvgScore",le="30000"} 89
func_run_duration_bucket{func_mode="Expand",func_name="PrintStuAvgScore",le="+Inf"} 89
func_run_duration_sum{func_mode="Expand",func_name="PrintStuAvgScore"} 4.070979
func_run_duration_count{func_mode="Expand",func_name="PrintStuAvgScore"} 89
func_run_duration_bucket{func_mode="Verify",func_name="VerifyStu",le="0.005"} 0
func_run_duration_bucket{func_mode="Verify",func_name="VerifyStu",le="0.01"} 0
func_run_duration_bucket{func_mode="Verify",func_name="VerifyStu",le="0.03"} 0
func_run_duration_bucket{func_mode="Verify",func_name="VerifyStu",le="0.08"} 26
func_run_duration_bucket{func_mode="Verify",func_name="VerifyStu",le="0.1"} 54
func_run_duration_bucket{func_mode="Verify",func_name="VerifyStu",le="0.5"} 89
func_run_duration_bucket{func_mode="Verify",func_name="VerifyStu",le="1"} 89
func_run_duration_bucket{func_mode="Verify",func_name="VerifyStu",le="5"} 89
func_run_duration_bucket{func_mode="Verify",func_name="VerifyStu",le="10"} 89
func_run_duration_bucket{func_mode="Verify",func_name="VerifyStu",le="100"} 89
func_run_duration_bucket{func_mode="Verify",func_name="VerifyStu",le="1000"} 89
func_run_duration_bucket{func_mode="Verify",func_name="VerifyStu",le="5000"} 89
func_run_duration_bucket{func_mode="Verify",func_name="VerifyStu",le="30000"} 89
func_run_duration_bucket{func_mode="Verify",func_name="VerifyStu",le="+Inf"} 89
func_run_duration_sum{func_mode="Verify",func_name="VerifyStu"} 8.789971000000003
func_run_duration_count{func_mode="Verify",func_name="VerifyStu"} 89
# HELP func_schedule_cnts Number of times each KisFlow Function has been scheduled
# TYPE func_schedule_cnts gauge
func_schedule_cnts{func_mode="Calculate",func_name="AvgStuScore"} 89
func_schedule_cnts{func_mode="Expand",func_name="PrintStuAvgScore"} 89
func_schedule_cnts{func_mode="Verify",func_name="VerifyStu"} 89
# HELP kisflow_data_total Total data volume for all KisFlow flows
# TYPE kisflow_data_total counter
kisflow_data_total 89
# HELP promhttp_metric_handler_requests_in_flight Current number of scrapes being served.
# TYPE promhttp_metric_handler_requests_in_flight gauge
promhttp_metric_handler_requests_in_flight 1
# HELP promhttp_metric_handler_requests_total Total number of scrapes by HTTP status code.
# TYPE promhttp_metric_handler_requests_total counter
promhttp_metric_handler_requests_total{code="200"} 1
promhttp_metric_handler_requests_total{code="500"} 0
promhttp_metric_handler_requests_total{code="503"} 0

11.5 Prometheus Metrics Grafana Dashboard

Now that you have Prometheus metrics, you can combine KisFlow's streaming computing program with Grafana for dashboard display. Since the statistical indicators and dashboard requirements for each developer's project may vary, this article does not provide specific Grafana dashboard configuration files. Below is an example KisFlow project dashboard for demonstration reference:

kisflow-metrics (1)

kis-flow-2 (1)

kis-flow-3 (1)