Case (II) – KisFlow-Golang Stream Real-Time Computing – Flow Parallel Operation


Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection

Case1-Quick Start
Case2-Flow Parallel Operation

Download KisFlow Source

$go get

KisFlow Developer Documentation

Source Code Example

KisFlow Can Achieve the Combination of Two Flows via Connector

Using the combination of the following two flows, this introduction will cover the interface and usage of the Connector.

Data Flow Diagram

Case Introduction

Assume a student has four attributes:

Student ID: stu_id
Credit 1: score_1
Credit 2: score_2
Credit 3: score_3

Define Flow1: CalStuAvgScore-1-2 to calculate a student’s average score of Credit 1 (score_1) and Credit 2 (score_2) (avg_score_1_2).
Define Flow2: CalStuAvgScore-3 to calculate a student’s average score of Credit 3 (score_3) and avg_score_1_2, which is the average of Credit 1, Credit 2, and Credit 3. The average of Credit 1 and Credit 2 is provided by Flow1.


Flow1 consists of 4 functions:

V (Function: VerifyStu) to verify the validity of StuId
C (Function: AvgStuScore12) to calculate the average score of Credit 1 and Credit 2
S (Function: SaveScoreAvg12) to store avg_score_1_2 in Redis
E (Function: PrintStuAvgScore) to print the average score of Credit 1 and Credit 2.


Flow2 consists of 4 functions:

V (Function: VerifyStu) to verify the validity of StuId
L (Function: LoadScoreAvg12) to read the current student’s average score of Credit 1 and Credit 2 (avg_score_1_2) calculated by Flow1
C (Function: AvgStuScore3) to calculate the average score of Credit 3 and the average score of Credit 1 and Credit 2
E (Function: PrintStuAvgScore) to print the average score of Credit 1, Credit 2, and Credit 3.


kistype: func
fname: AvgStuScore3
fmode: Calculate
name: SourceStuScore


kistype: func
fname: LoadScoreAvg12
fmode: Load
name: SourceStuScore
cname: Score12Cache

Basic Data Protocol


package main

type StuScore1_2 struct {
StuId int `json:”stu_id”`
Score1 int `json:”score_1″`
Score2 int `json:”score_2″`

type StuScoreAvg struct {
StuId int `json:”stu_id”`
AvgScore float64 `json:”avg_score”`

type StuScore3 struct {
StuId int `json:”stu_id”`
AvgScore12 float64 `json:”avg_score_1_2″` // score_1, score_2 avg
Score3 int `json:”score_3″`

Connector Init

The Connector defined in this project, Score12Cache, is a link resource associated with Redis. This Connector requires an initialization method for establishing a connection when KisFlow starts.


package main

import (

// type ConnInit func(conn Connector) error

func InitScore12Cache(connector kis.Connector) error {
fmt.Println(“===> Call Connector InitScore12Cache”)

// init Redis Conn Client
rdb := redis.NewClient(&redis.Options{
Addr: connector.GetConfig().AddrString, // Redis-Server address
Password: “”, // password
DB: 0, // select db

// Ping test
pong, err := rdb.Ping(context.Background()).Result()
if err != nil {
log.Logger().ErrorF(“Failed to connect to Redis: %v”, err)
return err
fmt.Println(“Connected to Redis:”, pong)

// set rdb to connector
connector.SetMetaData(“rdb”, rdb)

return nil

Here, the successfully connected Redis instance is stored in the connector’s cache variable “rdb.”

// set rdb to connector
connector.SetMetaData(“rdb”, rdb)

FaaS Implementation

Function(V): VerifyStu


package main

import (

type VerifyStuIn struct {
StuId int `json:”stu_id”`

func VerifyStu(ctx context.Context, flow kis.Flow, rows []*VerifyStuIn) error {
fmt.Printf(“->Call Func VerifyStun)

for _, stu := range rows {
// Filter out invalid data
if stu.StuId < 0 || stu.StuId > 999 {
// Terminate the current Flow process, subsequent functions of the current Flow will not be executed
return flow.Next(kis.ActionAbort)

return flow.Next(kis.ActionDataReuse)

VerifyStu() is used to validate data. If the data does not meet the requirements, the current data flow is terminated. Finally, the data is reused and passed to the next layer through flow.Next(kis.ActionDataReuse).

Function(C): AvgStuScore12


package main

import (

type AvgStuScoreIn_1_2 struct {

type AvgStuScoreOut_1_2 struct {

func AvgStuScore12(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn_1_2) error {
fmt.Printf(“->Call Func AvgStuScore12n)

for _, row := range rows {

out := AvgStuScoreOut_1_2{
StuScoreAvg: StuScoreAvg{
StuId: row.StuId,
AvgScore: float64(row.Score1+row.Score2) / 2,

// Submit result data
_ = flow.CommitRow(out)

return flow.Next()

AvgStuScore12() calculates the average score of score_1 and score_2, resulting in avg_score.

Function(S): SaveScoreAvg12


package main

import (

type SaveStuScoreIn struct {

func BatchSetStuScores(ctx context.Context, conn kis.Connector, rows []*SaveStuScoreIn) error {

var rdb *redis.Client

// Get Redis Client
rdb = conn.GetMetaData(“rdb”).(*redis.Client)

// Set data to redis
pipe := rdb.Pipeline()

for _, score := range rows {
// make key
key := conn.GetConfig().Key + strconv.Itoa(score.StuId)

pipe.HMSet(context.Background(), key, map[string]interface{}{
“avg_score”: score.AvgScore,

_, err := pipe.Exec(ctx)
if err != nil {
return err

return nil

func SaveScoreAvg12(ctx context.Context, flow kis.Flow, rows []*SaveStuScoreIn) error {
fmt.Printf(“->Call Func SaveScoreAvg12n)

conn, err := flow.GetConnector()
if err != nil {
fmt.Printf(“SaveScoreAvg12(): GetConnector err = %sn, err.Error())
return err

if BatchSetStuScores(ctx, conn, rows) != nil {
fmt.Printf(“SaveScoreAvg12(): BatchSetStuScores err = %sn, err.Error())
return err

return flow.Next(kis.ActionDataReuse)

SaveScoreAvg12() stores the data in Redis through the bound Connector, using the key configured in the Connector. Finally, the source data is transparently transmitted to the next function.

Function(E): PrintStuAvgScore


package main

import (

type PrintStuAvgScoreIn struct {
StuId int `json:”stu_id”`
AvgScore float64 `json:”avg_score”`

func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
fmt.Printf(“->Call Func PrintStuAvgScore, in Flow[%s]n, flow.GetName())

for _, row := range rows {
fmt.Printf(“stuid: [%+v], avg score: [%+v]n, row.StuId, row.AvgScore)

return flow.Next()

PrintStuAvgScore() prints the average score of the current student.

Function(L): LoadScoreAvg12


package main

import (

type LoadStuScoreIn struct {

type LoadStuScoreOut struct {

func GetStuScoresByStuId(ctx context.Context, conn kis.Connector, stuId int) (float64, error) {

var rdb *redis.Client

// Get Redis Client
rdb = conn.GetMetaData(“rdb”).(*redis.Client)

// make key
key := conn.GetConfig().Key + strconv.Itoa(stuId)

// get data from redis
result, err := rdb.HGetAll(ctx, key).Result()
if err != nil {
return 0, err

// get value
avgScoreStr, ok := result[“avg_score”]
if !ok {
return 0, fmt.Errorf(“avg_score not found for stuId: %d”, stuId)

// parse to float64
avgScore, err := strconv.ParseFloat(avgScoreStr, 64)
if err != nil {
return 0, err

return avgScore, nil

func LoadScoreAvg12(ctx context.Context, flow kis.Flow, rows []*LoadStuScoreIn) error {
fmt.Printf(“->Call Func LoadScoreAvg12n)

conn, err := flow.GetConnector()
if err != nil {
fmt.Printf(“LoadScoreAvg12(): GetConnector err = %sn, err.Error())
return err

for _, row := range rows {
stuScoreAvg1_2, err := GetStuScoresByStuId(ctx, conn, row.StuId)
if err != nil {
fmt.Printf(“LoadScoreAvg12(): GetStuScoresByStuId err = %sn, err.Error())
return err

out := LoadStuScoreOut{
StuScore3: StuScore3{
StuId: row.StuId,
Score3: row.Score3,
AvgScore12: stuScoreAvg1_2, // avg score of score1 and score2 (load from redis)

// commit result
_ = flow.CommitRow(out)

return flow.Next()

LoadScoreAvg12() reads the average score of score_1 and score_2 from Redis through the linked resource Redis of the bound Connector using the key configured in the Connector. It then sends the source data from upstream, along with the newly read average score of score1 and score2, to the next layer.

Function(C): AvgStuScore3


package main

import (

type AvgStuScore3In struct {

type AvgStuScore3Out struct {

func AvgStuScore3(ctx context.Context, flow kis.Flow, rows []*AvgStuScore3In) error {
fmt.Printf(“->Call Func AvgStuScore3n)

for _, row := range rows {

out := AvgStuScore3Out{
StuScoreAvg: StuScoreAvg{
StuId: row.StuId,
AvgScore: (float64(row.Score3) + row.AvgScore12*2) / 3,

// Submit result data
_ = flow.CommitRow(out)

return flow.Next()

AvgStuScore3() recalculates the average score of three scores by adding score_3 and the average score of score_1 and score_2, resulting in the final average score avg_score.

Register FaaS & CaaSInit/CaaS (Register Function/Connector)


func init() {
// Register functions
kis.Pool().FaaS(“VerifyStu”, VerifyStu)
kis.Pool().FaaS(“AvgStuScore12”, AvgStuScore12)
kis.Pool().FaaS(“SaveScoreAvg12”, SaveScoreAvg12)
kis.Pool().FaaS(“PrintStuAvgScore”, PrintStuAvgScore)
kis.Pool().FaaS(“LoadScoreAvg12”, LoadScoreAvg12)
kis.Pool().FaaS(“AvgStuScore3”, AvgStuScore3)

// Register connectors
kis.Pool().CaaSInit(“Score12Cache”, InitScore12Cache)

Main Process


package main

import (

func RunFlowCalStuAvgScore12(ctx context.Context, flow kis.Flow) error {

// Commit data
_ = flow.CommitRow(`{“stu_id”:101, “score_1”:100, “score_2”:90}`)
_ = flow.CommitRow(`{“stu_id”:102, “score_1”:100, “score_2”:80}`)

// Run the flow
if err := flow.Run(ctx); err != nil {
return err

return nil

func RunFlowCalStuAvgScore3(ctx context.Context, flow kis.Flow) error {

// Commit data
_ = flow.CommitRow(`{“stu_id”:101, “score_3”: 80}`)
_ = flow.CommitRow(`{“stu_id”:102, “score_3”: 70}`)

// Run the flow
if err := flow.Run(ctx); err != nil {
return err

return nil

func main() {
ctx := context.Background()

// Load Configuration from file
if err := file.ConfigImportYaml(“conf/”); err != nil {

var wg sync.WaitGroup

go func() {
// Run flow1 concurrently
defer wg.Done()

flow1 := kis.Pool().GetFlow(“CalStuAvgScore12”)
if flow1 == nil {
panic(“flow1 is nil”)

if err := RunFlowCalStuAvgScore12(ctx, flow1); err != nil {

go func() {
// Run flow2 concurrently
defer wg.Done()

flow2 := kis.Pool().GetFlow(“CalStuAvgScore3”)
if flow2 == nil {
panic(“flow2 is nil”)

if err := RunFlowCalStuAvgScore3(ctx, flow2); err != nil {



Two Goroutines are launched concurrently to execute Flow1 and Flow2, calculating the final average scores for student 101 and student 102.

Execution Results

===> Call Connector InitScore12Cache
Connected to Redis: PONG
Add FlowRouter FlowName=CalStuAvgScore12
===> Call Connector InitScore12Cache
Connected to Redis: PONG
Add FlowRouter FlowName=CalStuAvgScore3
->Call Func VerifyStu
->Call Func VerifyStu
->Call Func AvgStuScore12
->Call Func LoadScoreAvg12
->Call Func SaveScoreAvg12
->Call Func PrintStuAvgScore, in Flow[CalStuAvgScore12]
stuid: [101], avg score: [95]
stuid: [102], avg score: [90]
->Call Func AvgStuScore3
->Call Func PrintStuAvgScore, in Flow[CalStuAvgScore3]
stuid: [101], avg score: [90]
stuid: [102], avg score: [83.33333333333333]

In Flow[CalStuAvgScore3], we observe the final computed average scores for scores 1, 2, and 3.

Author: Aceld

KisFlow Open Source Project Address:


Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection

Case1-Quick Start
Case2-Flow Parallel Operation

