Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Part1-OverView
Part2-Project Construction / Basic Modules
To be continued.
[V0.1]-Project Construction and Basic Module Definition
First, let’s create our project. The main directory of the project is called KisFlow, and the corresponding repository is created on Github: https://github.com/aceld/kis-flow. Then clone the project code to your local machine.
2.0 Project Construction
(If you are developing according to this tutorial, you need to create a new project in your repository and clone it locally for development.)
2.0.1 Create Project Directory
Next, we will create the necessary file directories for the project. The directory structure of the project is as follows:
.
├── LICENSE
├── README.md
├── common/
├── example/
├── function/
├── conn/
├── config/
├── flow/
└── kis/
Here, we create six folders:
common/: To store some common basic constants, enumeration parameters, and some utility methods.
flow/: To store the core code of KisFlow.
function/: To store the core code of KisFunction.
conn/: To store the core code of KisConnector.
config/: To store configuration information for flow, function, connector, etc.
example/: To store some test cases and unit test cases for KisFlow to verify the project effect in a timely manner.
kis/: To store the abstraction layer of all modules.
2.0.2 Create go.mod
cd to the root directory of the kis-flow project and execute the following command:
You will get the go.mod file, which is the package management file for the current project:
go 1.18
First, because there will be a lot of debugging logs to print later, we integrate the log module. KisFlow provides a default standard output Logger object, and opens a SetLogger() method for developers to reset their own Logger module.
2.1 KisLogger
2.1.1 Logger Abstract Interface
Define Logger in the kis-flow/log/ directory and create the kis_log.go file:
kis-flow/log/kis_log.go
import “context”
type KisLogger interface {
// InfoFX Info level log interface with context, formatted string
InfoFX(ctx context.Context, str string, v …interface{})
// ErrorFX Error level log interface with context, formatted string
ErrorFX(ctx context.Context, str string, v …interface{})
// DebugFX Debug level log interface with context, formatted string
DebugFX(ctx context.Context, str string, v …interface{})
// InfoF Info level log interface without context, formatted string
InfoF(str string, v …interface{})
// ErrorF Error level log interface without context, formatted string
ErrorF(str string, v …interface{})
// DebugF Debug level log interface without context, formatted string
DebugF(str string, v …interface{})
}
// kisLog Default KisLog object
var kisLog KisLogger
// SetLogger Set KisLog object, can be a user-defined Logger object
func SetLogger(newlog KisLogger) {
kisLog = newlog
}
// Logger Get the kisLog object
func Logger() KisLogger {
return kisLog
}
KisLogger provides three levels of logs: Info, Error, and Debug. It also provides two sets of log interfaces with and without context parameters.
Provide a global object kisLog, the default KisLog object. And methods SetLogger() and Logger() are provided for developers to set their own Logger object and get the Logger object.
2.1.2 Default Log Object KisDefaultLogger
If the developer does not define a custom log object, KisFlow will provide a default log object kisDefaultLogger. This class implements all the interfaces of KisLogger, and all logs are printed in the default standard output format. It is defined in the kis-flow/log/ directory, create the kis_default_log.go file.
kis-flow/log/kis_default_log.go
import (
“context”
“fmt”
)
// kisDefaultLog Default provided log object
type kisDefaultLog struct{}
func (log *kisDefaultLog) InfoF(str string, v …interface{}) {
fmt.Printf(str, v…)
}
func (log *kisDefaultLog) ErrorF(str string, v …interface{}) {
fmt.Printf(str, v…)
}
func (log *kisDefaultLog) DebugF(str string, v …interface{}) {
fmt.Printf(str, v…)
}
func (log *kisDefaultLog) InfoFX(ctx context.Context, str string, v …interface{}) {
fmt.Println(ctx)
fmt.Printf(str, v…)
}
func (log *kisDefaultLog) ErrorFX(ctx context.Context, str string, v …interface{}) {
fmt.Println(ctx)
fmt.Printf(str, v…)
}
func (log *kisDefaultLog) DebugFX(ctx context.Context, str string, v …interface{}) {
fmt.Println(ctx)
fmt.Printf(str, v…)
}
func init() {
// If Logger is not set, use kisDefaultLog object by default at startup
if Logger() == nil {
SetLogger(&kisDefaultLog{})
}
}
Here, in the init() initialization method, it checks whether a global Logger object has been set. If not, KisFlow will default to using kisDefaultLog as the global Logger logging object.
2.1.3 Unit Testing KisLogger
For now, we won’t focus too much on the development of the KisLogger methods. Instead, we’ll prioritize getting the existing program up and running and conduct a unit test to test the creation of a KisLogger.
kis-flow/test/kis_log_test.go
import (
“context”
“kis-flow/log”
“testing”
)
func TestKisLogger(t *testing.T) {
ctx := context.Background()
log.Logger().InfoFX(ctx, “TestKisLogger InfoFX”)
log.Logger().ErrorFX(ctx, “TestKisLogger ErrorFX”)
log.Logger().DebugFX(ctx, “TestKisLogger DebugFX”)
log.Logger().InfoF(“TestKisLogger InfoF”)
log.Logger().ErrorF(“TestKisLogger ErrorF”)
log.Logger().DebugF(“TestKisLogger DebugF”)
}
Navigate to the kis-flow/test/ directory and run the unit test command:
The result is as follows:
context.Background
TestKisLogger InfoFX
context.Background
TestKisLogger ErrorFX
context.Background
TestKisLogger DebugFX
TestKisLogger InfoF
TestKisLogger ErrorF
TestKisLogger DebugF
— PASS: TestKisLogger (0.00s)
PASS
ok kis-flow/test 0.509s
2.2 KisConfig
In KisFlow, we define three core modules: KisFunction, KisFlow, and KisConnector. Therefore, KisConfig also needs to define these three modules separately. We will place all the code related to KisConfig in the kis-flow/config/ directory.
.
├── LICENSE
├── README.md
├── common/
│ └──
├── example/
│ └──
├── config/
│ ├──
├── test/
└── go.mod
2.2.1 KisFuncConfig Definition
In KisFlow, we define three core modules: KisFunction, KisFlow, and KisConnector. Therefore, KisConfig also needs to define these three modules separately. We will place all the code related to KisConfig in the kis-flow/config/ directory.
.
├── LICENSE
├── README.md
├── common/
│ └──
├── example/
│ └──
├── config/
│ ├──
├── test/
└── go.mod
2.2.1 KisFuncConfig Definition
The design document for KisFuncConfig in YAML format is as follows:
fname: TestKisFunction_S1
fmode: Save
source:
name: Test data source 1 – user order dimension
must:
– userid
– orderid
option:
cname: TestKisConnector_1
retry_times: 3
retry_duration: 500
default_params:
default1: default1_param
default2: default2_param
Parameter:
Field
Required
Meaning
kistype
Configuration file type
“func” — KisFunction
“flow” — KisFlow
“conn” — KisConnection
fname
KisFunction name
fmode
Current mode of the KisFunction
Verify: For feature verification KisFunction, mainly for data filtering, validation, field sorting, idempotent preprocessing
Save: For feature storage KisFunction, Save will store the data through KisConnector, and the temporary life cycle of the data is KisWindow
Load: For feature loading KisFunction, Load will load the data through KisConnector, and logically it can be merged with the corresponding Save KisFunction
Calculate: For feature calculation KisFunction, Calculate will calculate the data through KisData in KisFlow, generate new fields, pass the data flow to downstream Save for storage, or it can be stored directly through KisConnector
Expand: For extending features KisFunction, as a custom feature function for streaming computation, such as, Notify scheduler triggers task message sending, deleting some data, resetting status, etc.
source
Represents the business source of the current Function
source:name: Data source name
source:must: Fields that must be carried by the current data source (mainly for data validation)
option
Optional configurations
cname: Whether the current KisFunction is associated with KisConnection, if associated, fill in the name of the associated KisConnection
retry_times: Number of retries for Function scheduling
retry_duration: Interval time for each retry
default_params: Some custom parameters carried in the Function scheduling through configuration, key and value can be custom named, such as:
default1: default1_param
default2: default2_param
Next, based on the above configuration protocol, we will define the KisFunction strategy configuration structure and provide some corresponding initialization methods. We will create a kis_func_config.go file in the project documentation to implement the required Config definitions.
A. Struct Definition
kis-flow/config/kis_func_config.go
import (
“kis-flow/common”
“kis-flow/log”
)
// FParam represents the fixed configuration parameters type for Function in the current Flow
type FParam map[string]string
// KisSource represents the business source of the current Function
type KisSource struct {
Name string `yaml:”name”` // Description of the data source for this Function
Must []string `yaml:”must”` // Fields required by the source
}
// KisFuncOption optional configurations
type KisFuncOption struct {
CName string `yaml:”cname”` // Connector name
RetryTimes int `yaml:”retry_times”` // Optional, maximum number of retries for Function scheduling (excluding normal scheduling)
RetryDuriton int `yaml:”return_duration”` // Optional, maximum time interval for each retry of Function scheduling (unit: ms)
Params FParam `yaml:”default_params”` // Optional, fixed configuration parameters for Function in the current Flow
}
// KisFuncConfig a KisFunction strategy configuration
type KisFuncConfig struct {
KisType string `yaml:”kistype”`
FName string `yaml:”fname”`
FMode string `yaml:”fmode”`
Source KisSource `yaml:”source”`
Option KisFuncOption `yaml:”option”`
}
Here, KisFuncConfig is the related struct, where FParam, KisSource, KisFuncOption are all relevant parameter types.
B. Method Definitions
Below, we first provide a simple constructor for creating KisFuncConfig.
kis-flow/config/kis_func_config.go
func NewFuncConfig(funcName string, mode common.KisMode, source *KisSource, option *KisFuncOption) *KisFuncConfig {
config := new(KisFuncConfig)
config.FName = funcName
if source == nil {
log.Logger().ErrorF(“funcName NewConfig Error, source is nil, funcName = %sn“, funcName)
return nil
}
config.Source = *source
config.FMode = string(mode)
// Functions S and L require the KisConnector parameter to be passed, because S and L need to establish a streaming relationship through Connector
if mode == common.S || mode == common.L {
if option == nil {
log.Logger().ErrorF(“Function S/L need option->Cidn“)
return nil
} else if option.CName == “” {
log.Logger().ErrorF(“Function S/L need option->Cidn“)
return nil
}
}
if option != nil {
config.Option = *option
}
return config
}
The code above mentions two enum types, common.S and common.L. These are five types of enum values provided by us for KisFunction, which can be defined in the kis-flow/common/const.go file.
kis-flow/common/const.go
type KisMode string
const (
// V is for feature verification KisFunction,
// mainly for data filtering, validation, field sorting, idempotent preprocessing
V KisMode = “Verify”
// S is for feature storage KisFunction,
// Save will store the data through NsConnector, and the temporary life cycle of the data is NsWindow
S KisMode = “Save”
// L is for feature loading KisFunction,
// Load will load the data through KisConnector, and logically it can be merged with the corresponding S Function
L KisMode = “Load”
// C is for feature calculation KisFunction,
// Calculate will calculate the data through data in KisFlow, generate new fields, pass the data flow to downstream S for storage, or it can be stored directly through KisConnector
C KisMode = “Calculate”
// E is for extending features KisFunction,
// as a custom feature function for streaming computation, such as, Notify scheduler triggers task message sending, deleting some data, resetting status, etc.
E KisMode = “Expand”
)
If fmode is Save or Load, it means this function has the behavior of querying the library or storing data, then this Function needs to be associated with a KisConnector, so CName needs to be passed in.
C. Create KisFuncConfig Unit Test
Now, we won’t do much method development for KisFuncConfig. We’ll first run the existing program to do a unit test to test creating a KisFuncConfig.
kis-flow/test/kis_config_test.go
source := config.KisSource{
Name: “TikTokOrder”,
Must: []string{“order_id”, “user_id”},
}
option := config.KisFuncOption{
CName: “connectorName1”,
RetryTimes: 3,
RetryDuriton: 300,
Params: config.FParam{
“param1”: “value1”,
“param2”: “value2”,
},
}
myFunc1 := config.NewFuncConfig(“funcName1”, common.S, &source, &option)
log.Logger().InfoF(“funcName1: %+vn“, myFunc1)
}
We cd to the kis-flow/test/ directory and execute the unit test command:
The result is as follows:
funcName1: &{KisType: FName:funcName1 FMode:Save Source:{Name:TikTokOrder Must:[order_id user_id]} Option:{CName:connectorName1 RetryTimes:3 RetryDuriton:300 Params:map[param1:value1 param2:value2]}}
— PASS: TestNewFuncConfig (0.00s)
Alright, now the basic creation of KisFuncConfig strategy is basically completed.
2.2.2 Definition of KisFlowConfig
The YAML representation of KisFlowConfig in the design document is as follows:
status: 1
flow_name: MyFlow1
flows:
– fname: TestPrintInput
params:
args1: value1
args2: value2
– fname: TestKisFunction_S1
– fname: TestPrintInput
params:
args1: value11
args2: value22
default2: newDefault
– fname: TestPrintInput
– fname: TestKisFunction_S1
params:
my_user_param1: ffffffxxxxxx
– fname: TestPrintInput
Parameter:
Field
Required
Meaning
Example
kistype
Yes
Configuration file type
“func” — KisFunction
“flow” — KisFlow
“conn” — KisConnection
status
Yes
Whether the current KisFlow is started
1-Start
0-Not Start
flow_name
Yes
Name of the current Flow
flows
Yes
Information about the current Flow
flows:fname
Yes
Function associated with the current Flow
flows:fname:params
No
Whether custom parameters are carried when the current Flow is executed to the current Function
args1: value1
args2: value2
A. Struct Definition
Next, we define the KisFlow strategy configuration struct based on the aforementioned configuration protocol and provide corresponding initialization methods. We create a kis_flow_config.go file in the project documentation, where we will implement the required Config definitions.
kis-flow/config/kis_flow_config.go
import “kis-flow/common”
// KisFlowFunctionParam represents the ID of a Function and carries fixed configuration parameters within a Flow configuration
type KisFlowFunctionParam struct {
FuncName string `yaml:”fname”` // Required
Params FParam `yaml:”params”` // Optional, used to customize fixed configuration parameters for the Function within the current Flow
}
// KisFlowConfig represents an object that spans the entire streaming computing context
type KisFlowConfig struct {
KisType string `yaml:”kistype”`
Status int `yaml:”status”`
FlowName string `yaml:”flow_name”`
Flows []KisFlowFunctionParam `yaml:”flows”`
}
Here, a new parameter type KisFlowFunctionParam is provided. This represents the default parameters passed to the currently scheduled Function when configuring KisFlow. If not needed, this parameter can be omitted.
B. Method Definitions
We provide a constructor for creating a KisFlowConfig.
kis-flow/config/kis_flow_config.go
func NewFlowConfig(flowName string, enable common.KisOnOff) *KisFlowConfig {
config := new(KisFlowConfig)
config.FlowName = flowName
config.Flows = make([]KisFlowFunctionParam, 0)
config.Status = int(enable)
return config
}
// AppendFunctionConfig adds a Function Config to the current Flow
func (fConfig *KisFlowConfig) AppendFunctionConfig(params KisFlowFunctionParam) {
fConfig.Flows = append(fConfig.Flows, params)
}
Regarding the Function configuration carried by the flow, we dynamically add it through AppendFunctionConfig. This is done in anticipation that KisFlow configuration may be extracted from a database/dynamic remote configuration in the future. Thus, configurations need to be dynamically combined.
C. KisFlowConfig Unit Test
Similarly, we create a simple unit test to test the creation of KisFlowConfig.
kis-flow/test/kis_config_test.go
flowFuncParams1 := config.KisFlowFunctionParam{
FuncName: “funcName1”,
Params: config.FParam{
“flowSetFunParam1”: “value1”,
“flowSetFunParam2”: “value2”,
},
}
flowFuncParams2 := config.KisFlowFunctionParam{
FuncName: “funcName2”,
Params: config.FParam{
“default”: “value1”,
},
}
myFlow1 := config.NewFlowConfig(“flowName1”, common.FlowEnable)
myFlow1.AppendFunctionConfig(flowFuncParams1)
myFlow1.AppendFunctionConfig(flowFuncParams2)
log.Logger().InfoF(“myFlow1: %+vn“, myFlow1)
}
Navigate to the kis-flow/test/ directory and execute the following command to run the unit test:
The result is as follows:
myFlow1: &{KisType: Status:1 FlowName:flowName1 Flows:[{FuncName:funcName1 Params:map[flowSetFunParam1:value1 flowSetFunParam2:value2]} {FuncName:funcName2 Params:map[default:value1]}]}
— PASS: TestNewFlowConfig (0.00s)
PASS
ok kis-flow/test 0.251s
2.2.3 KisConnConfig
The KisConnConfig in the design document is formatted as follows in YAML:
cname: TestKisConnector_1
addrs: ‘0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990′
type: redis
key: userid_orderid_option
params:
args1: value1
args2: value2
load: null
save:
– 测试KisFunction_S1
A. Struct Definition
Next, we define the KisConnector strategy configuration struct based on the aforementioned configuration protocol and provide corresponding initialization methods. We create a kis_conn_config.go file in the project documentation, where we will implement the required Config definitions.
kis-flow/config/kis_conn_config.go
import (
“errors”
“fmt”
“kis-flow/common”
)
// KisConnConfig represents the KisConnector strategy configuration
type KisConnConfig struct {
// Configuration type
KisType string `yaml:”kistype”`
// Unique descriptor
CName string `yaml:”cname”`
// Basic storage medium address
AddrString string `yaml:”addrs”`
// Storage medium engine type, such as “Mysql”, “Redis”, “Kafka”, etc.
Type common.KisConnType `yaml:”type”`
// Identifier for a single storage: for example, Key name for Redis, Table name for Mysql, Topic name for Kafka, etc.
Key string `yaml:”key”`
// Custom parameters in the configuration information
Params map[string]string `yaml:”params”`
// Function ID bound to storage reading
Load []string `yaml:”load”`
Save []string `yaml:”save”`
}
B. Method Definitions
kis-flow/config/kis_conn_config.go
func NewConnConfig(cName string, addr string, t common.KisConnType, key string, param FParam) *KisConnConfig {
strategy := new(KisConnConfig)
strategy.CName = cName
strategy.AddrString = addr
strategy.Type = t
strategy.Key = key
strategy.Params = param
return strategy
}
// WithFunc binds Connector to Function
func (cConfig *KisConnConfig) WithFunc(fConfig *KisFuncConfig) error {
switch common.KisMode(fConfig.FMode) {
case common.S:
cConfig.Save = append(cConfig.Save, fConfig.FName)
case common.L:
cConfig.Load = append(cConfig.Load, fConfig.FName)
default:
return errors.New(fmt.Sprintf(“Wrong KisMode %s”, fConfig.FMode))
}
return nil
}
Here, the WithFunc method is also provided to dynamically add the relationship between Conn and Function.
C. KisConnConfig Unit Test
Similarly, we create a simple unit test to test the creation of KisConnConfig.
kis-flow/test/kis_config_test.go
source := config.KisSource{
Name: “Public Account TikTok Store User Order Data”,
Must: []string{“order_id”, “user_id”},
}
option := config.KisFuncOption{
CName: “connectorName1”,
RetryTimes: 3,
RetryDuriton: 300,
Params: config.FParam{
“param1”: “value1”,
“param2”: “value2”,
},
}
myFunc1 := config.NewFuncConfig(“funcName1”, common.S, &source, &option)
connParams := config.FParam{
“param1”: “value1”,
“param2”: “value2”,
}
myConnector1 := config.NewConnConfig(“connectorName1”, “0.0.0.0:9987,0.0.0.0:9997”, common.REDIS, “key”, connParams)
if err := myConnector1.WithFunc(myFunc1); err != nil {
log.Logger().ErrorF(“WithFunc err: %sn“, err.Error())
}
log.Logger().InfoF(“myConnector1: %+vn“, myConnector1)
}
Navigate to the kis-flow/test/ directory and execute the following command to run the unit test:
The result is as follows:
myConnector1: &{KisType: CName:connectorName1 AddrString:0.0.0.0:9987,0.0.0.0:9997 Type:redis Key:key Params:map[param1:value1 param2:value2] Load:[] Save:[funcName1]}
— PASS: TestNewConnConfig (0.00s)
PASS
ok kis-flow/test 0.481s
Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki