(Part 5)Golang Framework Hands-on – KisFlow Stream Computing Framework-Function Scheduling

(Part 5)Golang Framework Hands-on – KisFlow Stream Computing Framework-Function Scheduling

Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki

Part1-OverView
Part2-Project Construction / Basic Modules
Part3-Project Construction / Basic Modules
Part4-Data Stream
Part5-Function Scheduling

To be continued.

4.1 Router

Now, KisFlow provides the capability to externally register Functions. First, we need to define some prototypes for the registered functions and the type of Router that manages these Function mappings.

Create kis-flow/kis/router.go and define the prototypes as follows:

kis-flow/kis/router.go

package kis

import “context”

// FaaS Function as a Service
type FaaS func(context.Context, Flow) error

// funcRouter
// key: Function Name
// value: Function callback for custom business logic
type funcRouter map[string]FaaS

// flowRouter
// key: Flow Name
// value: Flow
type flowRouter map[string]Flow

FaaS: This is the prototype of the Function callback business function that developers register with KisFlow. It requires two parameters: Context mainly carries the business context, and Flow mainly carries the KisFlow context. Through Flow, we can obtain the current Function’s configuration information, data information, as well as information related to other nodes’ Functions on the Flow.

funcRouter: Manages the mapping between Function names and FaaS business callbacks. It’s a private type and not exposed externally. It’s worth noting that the key of funcRouter is the Function name because the Function ID is a generated random ID, and developers cannot predict or read it during route registration. Therefore, the business callback is mapped to the Function name.

flowRouter: Manages the mapping between Flow names and Flow instances. It’s a private type and not exposed externally. flowRouter still maps to Flow names.

4.2 KisPool

KisFlow provides a class called KisPool to manage all global mapping relationships. KisPool contains a Router and provides management capabilities for it.

4.2.1 Definition of KisPool

Create the kis-flow/kis/pool.go file to create the kis_pool module.

kis-flow/kis/pool.go

package kis

import (
“context”
“errors”
“fmt”
“kis-flow/log”
“sync”
)

var _poolOnce sync.Once

// kisPool is used to manage all Function and Flow configuration pools
type kisPool struct {
fnRouter funcRouter // All Function management routes
fnLock sync.RWMutex // Lock for fnRouter

flowRouter flowRouter // All flow objects
flowLock sync.RWMutex // Lock for flowRouter
}

// Singleton
var _pool *kisPool

// Pool Singleton constructor
func Pool() *kisPool {
_poolOnce.Do(func() {
// Create kisPool object
_pool = new(kisPool)

// Initialize fnRouter
_pool.fnRouter = make(funcRouter)

// Initialize flowRouter
_pool.flowRouter = make(flowRouter)
})

return _pool
}

kis_pool adopts the singleton pattern, and the Pool() method retrieves the current singleton. The fnRouter and flowRouter are initialized only once during the lifecycle, controlled by sync.Once.

4.2.2 Registering and Getting Flow

KisPool can provide interfaces to add and retrieve Flow information as follows:

kis-flow/kis/pool.go

func (pool *kisPool) AddFlow(name string, flow Flow) {
pool.flowLock.Lock()
defer pool.flowLock.Unlock()

if _, ok := pool.flowRouter[name]; !ok {
pool.flowRouter[name] = flow
} else {
errString := fmt.Sprintf(“Pool AddFlow Repeat FlowName=%sn, name)
panic(errString)
}

log.Logger().InfoF(“Add FlowRouter FlowName=%sn, name)
}

func (pool *kisPool) GetFlow(name string) Flow {
pool.flowLock.RLock()
defer pool.flowLock.RUnlock()

if flow, ok := pool.flowRouter[name]; ok {
return flow
} else {
return nil
}
}

AddFlow performs duplicate checking based on the same FlowName; the same Flow cannot be registered multiple times.

4.2.3 Registering and Scheduling Functions

KisPool provides methods to register Function callbacks and schedule Functions as follows.

kis-flow/kis/pool.go

// FaaS registers Function computing business logic, indexed and registered by Function Name
func (pool *kisPool) FaaS(fnName string, f FaaS) {
pool.fnLock.Lock()
defer pool.fnLock.Unlock()

if _, ok := pool.fnRouter[fnName]; !ok {
pool.fnRouter[fnName] = f
} else {
errString := fmt.Sprintf(“KisPoll FaaS Repeat FuncName=%s”, fnName)
panic(errString)
}

log.Logger().InfoF(“Add KisPool FuncName=%s”, fnName)
}

// CallFunction schedules a Function
func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error {

if f, ok := pool.fnRouter[fnName]; ok {
return f(ctx, flow)
}

log.Logger().ErrorFX(ctx, “FuncName: %s Can not find in KisPool, Not Added.n, fnName)

return errors.New(“FuncName: “ + fnName + ” Can not find in NsPool, Not Added.”)
}

In CallFunction(), the Flow parameter is required as the context environment for data stream scheduling. Developers can use Flow in custom FaaS to obtain some Function information. Therefore, we need to add some interfaces to Flow to retrieve configuration information if needed. These interfaces are added as follows:

kis-flow/kis/flow.go

type Flow interface {
// Run schedules the Flow, sequentially schedules Functions in the Flow and executes them
Run(ctx context.Context) error
// Link connects the Functions in the Flow according to the configuration in the configuration file
Link(fConf *config.KisFuncConfig, fParams config.FParam) error
// CommitRow submits Flow data to the upcoming Function layer
CommitRow(row interface{}) error
// Input gets the input source data for the currently executing Function in the Flow
Input() common.KisRowArr

// ++++++++++++++++++++++++++++++++++
// GetName gets the name of the Flow
GetName() string
// GetThisFunction gets the currently executing Function
GetThisFunction() Function
// GetThisFuncConf gets the configuration of the currently executing Function
GetThisFuncConf() *config.KisFuncConfig
}

kis-flow/flow/kis_flow.go

func (flow *KisFlow) GetName() string {
return flow.Name
}

func (flow *KisFlow) GetThisFunction() kis.Function {
return flow.ThisFunction
}

func (flow *KisFlow) GetThisFuncConf() *config.KisFuncConfig {
return flow.ThisFunction.GetConfig()
}

4.3 KisFunction Refers to KisPool for Scheduling

Now, we can use the Pool for scheduling within the Call() method of KisFunctionX. Let’s modify the Call() method for each Function accordingly.

kis-flow/function/kis_function_c.go

package function

import (
“context”
“kis-flow/kis”
“kis-flow/log”
)

type KisFunctionC struct {
BaseFunction
}

func (f *KisFunctionC) Call(ctx context.Context, flow kis.Flow) error {
log.Logger().InfoF(“KisFunctionC, flow = %+vn, flow)

// Route to the specific executing computation Function through KisPool
if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
log.Logger().ErrorFX(ctx, “Function Called Error err = %sn, err)
return err
}

return nil
}

kis-flow/function/kis_function_e.go

package function

import (
“context”
“kis-flow/kis”
“kis-flow/log”
)

type KisFunctionE struct {
BaseFunction
}

func (f *KisFunctionE) Call(ctx context.Context, flow kis.Flow) error {
log.Logger().InfoF(“KisFunctionE, flow = %+vn, flow)

// Route to the specific executing computation Function through KisPool
if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
log.Logger().ErrorFX(ctx, “Function Called Error err = %sn, err)
return err
}

return nil
}

kis-flow/function/kis_function_l.go

package function

import (
“context”
“kis-flow/kis”
“kis-flow/log”
)

type KisFunctionL struct {
BaseFunction
}

func (f *KisFunctionL) Call(ctx context.Context, flow kis.Flow) error {
log.Logger().InfoF(“KisFunctionL, flow = %+vn, flow)

// Route to the specific executing computation Function through KisPool
if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
log.Logger().ErrorFX(ctx, “Function Called Error err = %sn, err)
return err
}

return nil
}

kis-flow/function/kis_function_s.go

package function

import (
“context”
“kis-flow/kis”
“kis-flow/log”
)

type KisFunctionS struct {
BaseFunction
}

func (f *KisFunctionS) Call(ctx context.Context, flow kis.Flow) error {
log.Logger().InfoF(“KisFunctionS, flow = %+vn, flow)

// Route to the specific executing computation Function through KisPool
if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
log.Logger().ErrorFX(ctx, “Function Called Error err = %sn, err)
return err
}

return nil
}

kis-flow/function/kis_function_v.go

package function

import (
“context”
“kis-flow/kis”
“kis-flow/log”
)

type KisFunctionV struct {
BaseFunction
}

func (f *KisFunctionV) Call(ctx context.Context, flow kis.Flow) error {
log.Logger().InfoF(“KisFunctionV, flow = %+vn, flow)

// Route to the specific executing computation Function through KisPool
if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
log.Logger().ErrorFX(ctx, “Function Called Error err = %sn, err)
return err
}

return nil
}

4.4 KisPool Unit Testing

Next, let’s perform unit testing for KisPool.

4.4.1 Custom FaaS

kis-flow/test/kis_pool_test.go

package test

import (
“context”
“fmt”
“kis-flow/common”
“kis-flow/config”
“kis-flow/flow”
“kis-flow/kis”
“testing”
)

func funcName1Handler(ctx context.Context, flow kis.Flow) error {
fmt.Println(“—> Call funcName1Handler —-“)

for index, row := range flow.Input() {
// Print data
str := fmt.Sprintf(“In FuncName = %s, FuncId = %s, row = %s”, flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)

// Calculate result data
resultStr := fmt.Sprintf(“data from funcName[%s], index = %d”, flow.GetThisFuncConf().FName, index)

// Commit result data
_ = flow.CommitRow(resultStr)
}

return nil
}

func funcName2Handler(ctx context.Context, flow kis.Flow) error {

for _, row := range flow.Input() {
str := fmt.Sprintf(“In FuncName = %s, FuncId = %s, row = %s”, flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
}

return nil
}

4.4.2 Register FaaS and Start Flow

kis-flow/test/kis_pool_test.go

func TestNewKisPool(t *testing.T) {

ctx := context.Background()

// 0. Register Functions
kis.Pool().FaaS(“funcName1”, funcName1Handler)
kis.Pool().FaaS(“funcName2”, funcName2Handler)

// 1. Create 2 KisFunction configuration instances
source1 := config.KisSource{
Name: “Public account data from TikTok Mall”,
Must: []string{“order_id”, “user_id”},
}

source2 := config.KisSource{
Name: “User order error rate”,
Must: []string{“order_id”, “user_id”},
}

myFuncConfig1 := config.NewFuncConfig(“funcName1”, common.C, &source1, nil)
if myFuncConfig1 == nil {
panic(“myFuncConfig1 is nil”)
}

myFuncConfig2 := config.NewFuncConfig(“funcName2”, common.E, &source2, nil)
if myFuncConfig2 == nil {
panic(“myFuncConfig2 is nil”)
}

// 2. Create a KisFlow configuration instance
myFlowConfig1 := config.NewFlowConfig(“flowName1”, common.FlowEnable)

// 3. Create a KisFlow object
flow1 := flow.NewKisFlow(myFlowConfig1)

// 4. Link Functions to Flow
if err := flow1.Link(myFuncConfig1, nil); err != nil {
panic(err)
}
if err := flow1.Link(myFuncConfig2, nil); err != nil {
panic(err)
}

// 5. Commit original data
_ = flow1.CommitRow(“This is Data1 from Test”)
_ = flow1.CommitRow(“This is Data2 from Test”)
_ = flow1.CommitRow(“This is Data3 from Test”)

// 6. Run flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
}

Execute the following command in the kis-flow/test/ directory:

go test -test.v -test.paniconexit0 -test.run TestNewKisPool

The result is as follows:

=== RUN TestNewKisPool
Add KisPool FuncName=funcName1
Add KisPool FuncName=funcName2
context.Background
====> After CommitSrcData, flow_name = flowName1, flow_id = flow-1fdae2bfac684f1d8edf89d9000208c0
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]

KisFunctionC, flow = &{Id:flow-1fdae2bfac684f1d8edf89d9000208c0 Name:flowName1 Conf:0xc0000e27c0 Funcs:map[func-51527b72a4ee447fb0bd494bda9a84ad:0xc0000c0190 func-9cd2ab870b384794b312d2be10bb06fa:0xc0000c01e0] FlowHead:0xc0000c0190 FlowTail:0xc0000c01e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000c0190 ThisFunctionId:func-51527b72a4ee447fb0bd494bda9a84ad PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-51527b72a4ee447fb0bd494bda9a84ad:map[] func-9cd2ab870b384794b312d2be10bb06fa:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]}

> Call funcName1Handler —-
In FuncName = funcName1, FuncId = func-51527b72a4ee447fb0bd494bda9a84ad, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-51527b72a4ee447fb0bd494bda9a84ad, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-51527b72a4ee447fb0bd494bda9a84ad, row = This is Data3 from Test
context.Background
====> After commitCurData, flow_name = flowName1, flow_id = flow-1fdae2bfac684f1d8edf89d9000208c0
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-51527b72a4ee447fb0bd494bda9a84ad:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionE, flow = &{Id:flow-1fdae2bfac684f1d8edf89d9000208c0 Name:flowName1 Conf:0xc0000e27c0 Funcs:map[func-51527b72a4ee447fb0bd494bda9a84ad:0xc0000c0190 func-9cd2ab870b384794b312d2be10bb06fa:0xc0000c01e0] FlowHead:0xc0000c0190 FlowTail:0xc0000c01e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000c01e0 ThisFunctionId:func-9cd2ab870b384794b312d2be10bb06fa PrevFunctionId:func-51527b72a4ee447fb0bd494bda9a84ad funcParams:map[func-51527b72a4ee447fb0bd494bda9a84ad:map[] func-9cd2ab870b384794b312d2be10bb06fa:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-51527b72a4ee447fb0bd494bda9a84ad:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]}

In FuncName = funcName2, FuncId = func-9cd2ab870b384794b312d2be10bb06fa, row = data from funcName[funcName1], index = 0
In FuncName = funcName2, FuncId = func-9cd2ab870b384794b312d2be10bb06fa, row = data from funcName[funcName1], index = 1
In FuncName = funcName2, FuncId = func-9cd2ab870b384794b312d2be10bb06fa, row = data from funcName[funcName1], index = 2
PASS: TestNewKisPool (0.00s)
PASS
ok kis-flow/test 0.520s

After detailed logging verification, the result meets our expectations.

Now that the business capabilities of Functions have been exposed to developers, let’s continue to enhance the capabilities of KisFlow.

4.5 [V0.3] Source Code

https://github.com/aceld/kis-flow/releases/tag/v0.3

Author: Aceld
GitHub: https://github.com/aceld

KisFlow Open Source Project Address: https://github.com/aceld/kis-flow

Document: https://github.com/aceld/kis-flow/wiki

Part1-OverView
Part2-Project Construction / Basic Modules
Part3-Project Construction / Basic Modules
Part4-Data Stream
Part5-Function Scheduling

Leave a Reply

Your email address will not be published. Required fields are marked *