Golang结构实战-KisFlow流式核算结构专栏

Golang结构实战-KisFlow流式核算结构(1)-概述

Golang结构实战-KisFlow流式核算结构(2)-项目构建/根底模块-(上)

Golang结构实战-KisFlow流式核算结构(3)-项目构建/根底模块-(下)

Golang结构实战-KisFlow流式核算结构(4)-数据流

Golang结构实战-KisFlow流式核算结构(5)-Function调度

Golang结构实战-KisFlow流式核算结构(6)-Connector

Golang结构实战-KisFlow流式核算结构(7)-装备导入与导出


7.1 Action Abort(停止流程)

KisFlow Action 是指在履行Function的时候,一起能够操控Flow的调度逻辑,KisFlow提供一些Action动作让开发者做选择,本节先介绍最简单的Action动作,Abort(停止当时Flow)。

咱们终究的Abort的运用形式如下:

func AbortFuncHandler(ctx context.Context, flow kis.Flow) error {
	fmt.Println("---> Call AbortFuncHandler ----")
	for _, row := range flow.Input() {
		str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
		fmt.Println(str)
	}
	return flow.Next(kis.ActionAbort)  // 停止Flow
}

AbortFuncHandler()是一个Function 的事务回调办法,是由开发者自界说的,在履行完当时Funciton之后,正常的情况是持续履行下一个Funciton,可是假如传递flow.Next(kis.ActionAbort) 作为当时Funciton的回来值,那么则不会履行到下一个Funciton,而是直接停止当时Flow的调度核算流。

下面咱们先来完成KisFlow的 Abort Action动作形式。

7.1.1 Abort接口界说

首要,先对Flow的Abort()接口做界说。

kis-flow/kis/flow.go

type Flow interface {
	// Run 调度Flow,顺次调度Flow中的Function而且履行
	Run(ctx context.Context) error
	// Link 将Flow中的Function按照装备文件中的装备进行连接
	Link(fConf *config.KisFuncConfig, fParams config.FParam) error
	// CommitRow 提交Flow数据到即将履行的Function层
	CommitRow(row interface{}) error
	// Input 得到flow当时履行Function的输入源数据
	Input() common.KisRowArr
	// GetName 得到Flow的称号
	GetName() string
	// GetThisFunction 得到当时正在履行的Function
	GetThisFunction() Function
	// GetThisFuncConf 得到当时正在履行的Function的装备
	GetThisFuncConf() *config.KisFuncConfig
	// GetConnector 得到当时正在履行的Function的Connector
	GetConnector() (Connector, error)
	// GetConnConf 得到当时正在履行的Function的Connector的装备
	GetConnConf() (*config.KisConnConfig, error)
	// GetConfig 得到当时Flow的装备
	GetConfig() *config.KisFlowConfig
	// GetFuncConfigByName 得到当时Flow的装备
	GetFuncConfigByName(funcName string) *config.KisFuncConfig
	//  --- KisFlow Action ---
	// Next 当时Flow履行到的Function进入下一层Function所带着的Action动作
	Next(acts ...ActionFunc) error
}

这儿边提供一个接口Next(acts ...ActionFunc) error,其间参数是一个可变参数,类型为ActionFunc,这个是咱们给KisFlow界说的Action相关的办法。有关Action的界说模块如下:

7.1.2 Action模块界说

Action是用来在Flow履行过程中,经过Function来操控Flow履行特别动作的行为装备模块,包括上面的Abort行为,Abort也归于其间一个Action。Action的模块界说如下,在kis-flow/kis/下创立action.go文件,完成:

kis-flow/kis/action.go

package kis
// Action KisFlow履行流程Actions
type Action struct {
	// Abort 停止Flow的履行
	Abort bool
}
// ActionFunc KisFlow Functional Option 类型
type ActionFunc func(ops *Action)
// LoadActions 加载Actions,顺次履行ActionFunc操作函数
func LoadActions(acts []ActionFunc) Action {
	action := Action{}
	if acts == nil {
		return action
	}
	for _, act := range acts {
		act(&action)
	}
	return action
}
// ActionAbort 停止Flow的履行
func ActionAbort(action *Action) {
	action.Abort = true
}

首要,现在Action只要Abort一个行为,咱们用bool类型来表明Abort是否为停止,true则为需求停止flow的调用。
其次,type ActionFunc func(ops *Action)这个函数原型为一个函数类型,函数的形参是传递进来一个Action{} 指针,而 func ActionAbort(action *Action)则是它的一个详细的函数,ActionAbort()的办法的意图便是将Action的Abort成员设置为true。

最终看func LoadActions(acts []ActionFunc) Action办法。这个形参是一个ActionFunc函数数组,LoadActions()则是创立一个新的Action{} ,然后顺次履行[]ActionFunc的函数来改动Aciton{}的成员,终究将新的Action{}回来上层。

7.1.3 Next办法完成

接下来,咱们需求给KisFlow模块完成这个接口,首要需求给KisFlow增加一个Action{}成员,表明每次履行完Function之后所带着的动作。

kis-flow/flow/kis_flow.go

// KisFlow 用于贯穿整条流式核算的上下文环境
type KisFlow struct {
	// 根底信息
	Id   string                // Flow的分布式实例ID(用于KisFlow内部区别不同实例)
	Name string                // Flow的可读称号
	Conf *config.KisFlowConfig // Flow装备策略
	// Function列表
	Funcs          map[string]kis.Function // 当时flow具有的全部办理的全部Function目标, key: FunctionName
	FlowHead       kis.Function            // 当时Flow所具有的Function列表表头
	FlowTail       kis.Function            // 当时Flow所具有的Function列表表尾
	flock          sync.RWMutex            // 办理链表刺进读写的锁
	ThisFunction   kis.Function            // Flow当时正在履行的KisFunction目标
	ThisFunctionId string                  // 当时履行到的Function ID
	PrevFunctionId string                  // 当时履行到的Function 上一层FunctionID
	// Function列表参数
	funcParams map[string]config.FParam // flow在当时Function的自界说固定装备参数,Key:function的实例KisID, value:FParam
	fplock     sync.RWMutex             // 办理funcParams的读写锁
	// 数据
	buffer common.KisRowArr  // 用来临时寄存输入字节数据的内部Buf, 一条数据为interface{}, 多条数据为[]interface{} 也便是KisBatch
	data   common.KisDataMap // 流式核算各个层级的数据源
	inPut  common.KisRowArr  // 当时Function的核算输入数据
    // +++++++++++++++++++++
	// KisFlow Action
	action kis.Action        // 当时Flow所带着的Action动作
}

然后完成KisFlow的Next()接口,如下:

kis-flow/flow/kis_flow.go

// Next 当时Flow履行到的Function进入下一层Function所带着的Action动作
func (flow *KisFlow) Next(acts ...kis.ActionFunc) error {
	// 加载Function FaaS 传递的 Action动作
	flow.action = kis.LoadActions(acts)
	return nil
}

每次开发者在履行Function的自界说事务回调中,最终会调用flow.Next()来传递Action,所以 Next(acts ...kis.ActionFunc) error便是讲传递的Action特点加载进来而且在flow.action保存。

7.1.4 Abort操控Flow流程

现在有个Abort来操控Flow流,那么咱们需求给KisFlow增加一个成员来表明这个状况

kis-flow/flow/kis_flow.go

// KisFlow 用于贯穿整条流式核算的上下文环境
type KisFlow struct {
	// 根底信息
	Id   string                // Flow的分布式实例ID(用于KisFlow内部区别不同实例)
	Name string                // Flow的可读称号
	Conf *config.KisFlowConfig // Flow装备策略
	// Function列表
	Funcs          map[string]kis.Function // 当时flow具有的全部办理的全部Function目标, key: FunctionName
	FlowHead       kis.Function            // 当时Flow所具有的Function列表表头
	FlowTail       kis.Function            // 当时Flow所具有的Function列表表尾
	flock          sync.RWMutex            // 办理链表刺进读写的锁
	ThisFunction   kis.Function            // Flow当时正在履行的KisFunction目标
	ThisFunctionId string                  // 当时履行到的Function ID
	PrevFunctionId string                  // 当时履行到的Function 上一层FunctionID
	// Function列表参数
	funcParams map[string]config.FParam // flow在当时Function的自界说固定装备参数,Key:function的实例KisID, value:FParam
	fplock     sync.RWMutex             // 办理funcParams的读写锁
	// 数据
	buffer common.KisRowArr  // 用来临时寄存输入字节数据的内部Buf, 一条数据为interface{}, 多条数据为[]interface{} 也便是KisBatch
	data   common.KisDataMap // 流式核算各个层级的数据源
	inPut  common.KisRowArr  // 当时Function的核算输入数据
	action kis.Action        // 当时Flow所带着的Action动作
    // +++++++++
    abort  bool              // 是否中断Flow
}

在每次履行到flow.Run()办法时,需求重置abort变量,而且在循环调度的时候加上对flow.abort的判别。

kis-flow/flow/kis_flow.go

// Run 发动KisFlow的流式核算, 从开端Function开端履行流
func (flow *KisFlow) Run(ctx context.Context) error {
    // +++++++++
	// 重置 abort
	flow.abort = false  //  每次进入调度,要重置abort状况
	// ... ...
    // ... ...
	//流式链式调用
	for fn != nil && flow.abort != true { // ++++ 假如设置abort则不进入下次循环调度
		// ... ...
        // ... ...
        if err := fn.Call(ctx, flow); err != nil {
			//Error
			return err
		} else {
			//Success
			// ... ...
			fn = fn.Next()
        }
	}
	return nil

这样在每次Call()调度到Funciton的自定办法时,假如return flow.Next(ActionAbort)就会对flow的Action状况进行改动,从而就操控了flow的流程停止。最终便是将Action的Abort状况传递给KisFlow的Abort状况。

已然有了Abort状况,那么咱们能够经过给Flow履行过程中增加一个设定,假如当时的Function没有提交本层的成果数据,也便是flow.buffer为空,那么将不会进入下一层,在本层直接完毕退出Flow的Run()调用。

kis-flow/flow/kis_flow_data.go

//commitCurData 提交Flow当时履行Function的成果数据
func (flow *KisFlow) commitCurData(ctx context.Context) error {
	// 判别本层核算是否有成果数据,假如没有则退出本次Flow Run循环
	if len(flow.buffer) == 0 {
        // ++++++++++++
		flow.abort = true
		return nil
	}
	// ... ...
    // ... ...
	return nil

7.1.5 Action捕获及处理

接下来来完成一个专门处理Action动作的办法,界说在kis-flow/flow/kis_flow_action.go文件中,如下:

kis-flow/flow/kis_flow_action.go

package flow
import (
	"context"
	"errors"
	"fmt"
	"kis-flow/kis"
)
// dealAction  处理Action,决议接下来Flow的流程走向
func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {
	if err := flow.commitCurData(ctx); err != nil {
		return nil, err
	}
	// 更新上一层 FuncitonId 游标
	flow.PrevFunctionId = flow.ThisFunctionId
	fn = fn.Next()
	// Abort Action 强制停止
	if flow.action.Abort {
		flow.abort = true
	}
	// 清空Action
	flow.action = kis.Action{}
	return fn, nil
}

然后略微改善下KisFlow的Run() 流程,将dealAction() 办法嵌入进去。

kis-flow/flow/kis_flow.go

// Run 发动KisFlow的流式核算, 从开端Function开端履行流
func (flow *KisFlow) Run(ctx context.Context) error {
	var fn kis.Function
	fn = flow.FlowHead
	flow.abort = false
	if flow.Conf.Status == int(common.FlowDisable) {
		//flow被装备关闭
		return nil
	}
	// 由于此时还没有履行任何Function, 所以PrevFunctionId为FirstVirtual 由于没有上一层Function
	flow.PrevFunctionId = common.FunctionIdFirstVirtual
	// 提交数据流原始数据
	if err := flow.commitSrcData(ctx); err != nil {
		return err
	}
	//流式链式调用
	for fn != nil && flow.abort == false {
		// flow记录当时履行到的Function 符号
		fid := fn.GetId()
		flow.ThisFunction = fn
		flow.ThisFunctionId = fid
		// 得到当时Function要处理与的源数据
		if inputData, err := flow.getCurData(); err != nil {
			log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %sn", err.Error())
			return err
		} else {
			flow.inPut = inputData
		}
		if err := fn.Call(ctx, flow); err != nil {
			//Error
			return err
		} else {
			//Success
            // +++++++++++++++++++++++++++++++
			fn, err = flow.dealAction(ctx, fn)
			if err != nil {
				return err
			}
            // +++++++++++++++++++++++++++++++
		}
	}
	return nil
}

7.1.6 Action Abort单元测试

首要咱们新建一个Function事务,装备文件如下:

kis-flow/test/load_conf/func/func-AbortFunc.yml

kistype: func
fname: abortFunc
fmode: Calculate
source:
  name: 用户订单错误率
  must:
    - order_id
    - user_id

当时的Funciton的称号为abortFunc,然后完成其FaaS函数,如下:

kis-flow/test/faas/faas_abort.go

package faas
import (
	"context"
	"fmt"
	"kis-flow/kis"
)
// type FaaS func(context.Context, Flow) error
func AbortFuncHandler(ctx context.Context, flow kis.Flow) error {
	fmt.Println("---> Call AbortFuncHandler ----")
	for _, row := range flow.Input() {
		str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
		fmt.Println(str)
	}
	return flow.Next(kis.ActionAbort)
}

这个Function就会终究调用flow.Next(kis.ActionAbort)来停止Flow,接下来咱们新建一个Flow,将上面的Function作为中心的Function,看检测是否会停止之后的Function被履行。
新建的flow的装备如下:

kis-flow/test/load_conf/flow/flow-FlowName2.yml

kistype: flow
status: 1
flow_name: flowName2
flows:
  - fname: funcName1
  - fname: abortFunc
  - fname: funcName3

当时Flow的称号为flowName2,当时的Flow有三个Function,其间funcNam1 和 funcName2咱们之前都现已界说好了,abortFunc是咱们新建的,而且在中心。假如abort功用满意,则funcName3将不会被调度。

接下来完成单元测试用例。

kis-flow/test/kis_action_test.go

package test
import (
	"context"
	"kis-flow/common"
	"kis-flow/file"
	"kis-flow/kis"
	"kis-flow/test/caas"
	"kis-flow/test/faas"
	"testing"
)
func TestActionAbort(t *testing.T) {
	ctx := context.Background()
	// 0. 注册Function 回调事务
	kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
	kis.Pool().FaaS("abortFunc", faas.AbortFuncHandler) // 增加abortFunc 事务
	kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
	// 0. 注册ConnectorInit 和 Connector 回调事务
	kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
	kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
	// 1. 加载装备文件并构建Flow
	if err := file.ConfigImportYaml("/Users/gopath/src/kis-flow/test/load_conf/"); err != nil {
		panic(err)
	}
	// 2. 获取Flow
	flow1 := kis.Pool().GetFlow("flowName2")
	// 3. 提交原始数据
	_ = flow1.CommitRow("This is Data1 from Test")
	_ = flow1.CommitRow("This is Data2 from Test")
	_ = flow1.CommitRow("This is Data3 from Test")
	// 4. 履行flow1
	if err := flow1.Run(ctx); err != nil {
		panic(err)
	}
}

其间下面的代码是初始化注册的代码,咱们也能够写在其他文件中,这样就不需求每次都带着这部分代码了。

	// 0. 注册Function 回调事务
	kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
	kis.Pool().FaaS("abortFunc", faas.AbortFuncHandler) // 增加abortFunc 事务
	kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
	// 0. 注册ConnectorInit 和 Connector 回调事务
	kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
	kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

cd 到kis-flow/test/目录下履行如下指令:

go test -test.v -test.paniconexit0 -test.run  TestActionAbort

成果如下:

=== RUN   TestActionAbort
Add KisPool FuncName=funcName1
Add KisPool FuncName=abortFunc
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName1
Add FlowRouter FlowName=flowName2
context.Background
====> After CommitSrcData, flow_name = flowName2, flow_id = flow-b6b90eb4b7d7457fbf85b3299b625513
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]
KisFunctionV, flow = &{Id:flow-b6b90eb4b7d7457fbf85b3299b625513 Name:flowName2 Conf:0xc000092cc0 Funcs:map[abortFunc:0xc000094d20 funcName1:0xc000094cc0 funcName3:0xc000094d80] FlowHead:0xc000094cc0 FlowTail:0xc000094d80 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000094cc0 ThisFunctionId:func-c435cf9f8e3346a1851f8c76375fce0f PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-7f5af1521fd64d08839d5bdd26de5254:map[] func-c435cf9f8e3346a1851f8c76375fce0f:map[] func-f0b80593fe2e4018a878f155b9c543b4:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] jumpFunc:NoJump abort:false nextOpt:<nil>}
---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-c435cf9f8e3346a1851f8c76375fce0f, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-c435cf9f8e3346a1851f8c76375fce0f, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-c435cf9f8e3346a1851f8c76375fce0f, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName2, flow_id = flow-b6b90eb4b7d7457fbf85b3299b625513
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c435cf9f8e3346a1851f8c76375fce0f:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionC, flow = &{Id:flow-b6b90eb4b7d7457fbf85b3299b625513 Name:flowName2 Conf:0xc000092cc0 Funcs:map[abortFunc:0xc000094d20 funcName1:0xc000094cc0 funcName3:0xc000094d80] FlowHead:0xc000094cc0 FlowTail:0xc000094d80 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000094d20 ThisFunctionId:func-7f5af1521fd64d08839d5bdd26de5254 PrevFunctionId:func-c435cf9f8e3346a1851f8c76375fce0f funcParams:map[func-7f5af1521fd64d08839d5bdd26de5254:map[] func-c435cf9f8e3346a1851f8c76375fce0f:map[] func-f0b80593fe2e4018a878f155b9c543b4:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c435cf9f8e3346a1851f8c76375fce0f:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] jumpFunc:NoJump abort:false nextOpt:<nil>}
---> Call AbortFuncHandler ----
In FuncName = abortFunc, FuncId = func-7f5af1521fd64d08839d5bdd26de5254, row = data from funcName[funcName1], index = 0
In FuncName = abortFunc, FuncId = func-7f5af1521fd64d08839d5bdd26de5254, row = data from funcName[funcName1], index = 1
In FuncName = abortFunc, FuncId = func-7f5af1521fd64d08839d5bdd26de5254, row = data from funcName[funcName1], index = 2
--- PASS: TestActionAbort (0.00s)
PASS
ok      kis-flow/test   0.487s

经过成果能够看到,在履行完 AbortFuncHandler 后,没有持续履行,而是退出了Flow的Run()办法。

7.2 Action DataReuse(复用上层数据)

Action DataReuse 为服用上层数据,含义为,当时的履行Function提交到下一层的成果将不被运用,而是直接将当时Function的上一层成果数据,复用到下一层,作为下一层Funciton的数据源。

下面来完成Action DataReuse功用。

7.2.1 DataReuse Action增加

在Action中增加DataReuse成员,是一个bool类型。

kis-flow/kis/action.go

// Action KisFlow履行流程Actions
type Action struct {
    // +++++++++++++
	// DataReuse 是否复用上层Function数据
	DataReuse bool
	// Abort 停止Flow的履行
	Abort bool
}
// ActionDataReuse Next复用上层Function数据Option
func ActionDataReuse(act *Action) {
	act.DataReuse = true
}

然后提供一个ActionFunc,命名为:ActionDataReuse,完成中为改动DataReuse状况为true。

7.2.2 复用上层数据到基层

这儿需求再完成一个提交数据的办法,为怎么提交复用数据,详细逻辑如下:

kis-flow/flow/kis_flow_data.go

// commitReuseData
func (flow *KisFlow) commitReuseData(ctx context.Context) error {
	// 判别上层是否有成果数据, 假如没有则退出本次Flow Run循环
	if len(flow.data[flow.PrevFunctionId]) == 0 {
		flow.abort = true
		return nil
	}
	// 本层成果数据等于上层成果数据(复用上层成果数据到本层)
	flow.data[flow.ThisFunctionId] = flow.data[flow.PrevFunctionId]
	// 清空缓冲Buf (假如是ReuseData选项,那么提交的全部数据,都将不会带着到下一层)
	flow.buffer = flow.buffer[0:0]
	log.Logger().DebugFX(ctx, " ====> After commitReuseData, flow_name = %s, flow_id = %snAll Level Data =n %+vn", flow.Name, flow.Id, flow.data)
	return nil
}

逻辑很简单,与commitCurData()不同的是,commitCurData()为将flow.buffer的数据提交到flow.data[flow.ThisFunctionId]中,而commitReuseData()为将上一层的成果数据提交到flow.data[flow.ThisFunctionId]中。

7.2.3 处理DataReuse Action动作

然后在dealAction()办法中增加对Action DataReuse的动作捕获,如下:

kis-flow/flow/kis_flow_action.go

// dealAction  处理Action,决议接下来Flow的流程走向
func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {
    // ++++++++++++++++
	// DataReuse Action
	if flow.action.DataReuse {
		if err := flow.commitReuseData(ctx); err != nil {
			return nil, err
		}
	} else {
		if err := flow.commitCurData(ctx); err != nil {
			return nil, err
		}
	}
	// 更新上一层 FuncitonId 游标
	flow.PrevFunctionId = flow.ThisFunctionId
	fn = fn.Next()
	// Abort Action 强制停止
	if flow.action.Abort {
		flow.abort = true
	}
	// 清空Action
	flow.action = kis.Action{}
	return fn, nil
}

7.2.4 单元测试

下面来针对DataReuse做单元测试,首要创立一个名字为dataReuseFunc 的Funciton,先创立装备文件。

kis-flow/test/load_conf/func/func-dataReuseFunc.yml

kistype: func
fname: dataReuseFunc
fmode: Calculate
source:
  name: 用户订单错误率
  must:
    - order_id
    - user_id

一起新建一个Flow流,称号为flowName3,装备如下:

kis-flow/test/load_conf/flow/func-FlowName3.yml

kistype: flow
status: 1
flow_name: flowName3
flows:
  - fname: funcName1
  - fname: dataReuseFunc
  - fname: funcName3

针对dataReuseFunc的Function的逻辑事务,如下:

kis-flow/test/faas/faas_data_reuse.go

package faas
import (
	"context"
	"fmt"
	"kis-flow/kis"
)
// type FaaS func(context.Context, Flow) error
func DataReuseFuncHandler(ctx context.Context, flow kis.Flow) error {
	fmt.Println("---> Call DataReuseFuncHandler ----")
	for index, row := range flow.Input() {
		str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
		fmt.Println(str)
		// 核算成果数据
		resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)
		// 提交成果数据
		_ = flow.CommitRow(resultStr)
	}
	return flow.Next(kis.ActionDataReuse)
}

最终完成测试用例,如下:

kis-flow/test/kis_action_test.go

func TestActionDataReuse(t *testing.T) {
	ctx := context.Background()
	// 0. 注册Function 回调事务
	kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
	kis.Pool().FaaS("dataReuseFunc", faas.DataReuseFuncHandler) // 增加dataReuesFunc 事务
	kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
	// 0. 注册ConnectorInit 和 Connector 回调事务
	kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
	kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
	// 1. 加载装备文件并构建Flow
	if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
		panic(err)
	}
	// 2. 获取Flow
	flow1 := kis.Pool().GetFlow("flowName3")
	// 3. 提交原始数据
	_ = flow1.CommitRow("This is Data1 from Test")
	_ = flow1.CommitRow("This is Data2 from Test")
	_ = flow1.CommitRow("This is Data3 from Test")
	// 4. 履行flow1
	if err := flow1.Run(ctx); err != nil {
		panic(err)
	}
}

cd 到 kis-flow/test/下履行:

go test -test.v -test.paniconexit0 -test.run  TestActionDataReuse

成果是:

=== RUN   TestActionDataReuse
Add KisPool FuncName=funcName1
Add KisPool FuncName=dataReuseFunc
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName5
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
Add FlowRouter FlowName=flowName1
Add FlowRouter FlowName=flowName2
Add FlowRouter FlowName=flowName3
Add FlowRouter FlowName=flowName4
context.Background
====> After CommitSrcData, flow_name = flowName3, flow_id = flow-2c1a23d9587842bebaeee490319de81f
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]
KisFunctionV, flow = &{Id:flow-2c1a23d9587842bebaeee490319de81f Name:flowName3 Conf:0xc000092dc0 Funcs:map[dataReuseFunc:0xc000095620 funcName1:0xc0000955c0 funcName3:0xc000095680] FlowHead:0xc0000955c0 FlowTail:0xc000095680 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000955c0 ThisFunctionId:func-7886178381634f05b302841141382e59 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-7886178381634f05b302841141382e59:map[] func-cfe66e39aba54ff989d6764cc4edda20:map[] func-ef567879d0dd45b287ed709e549e9d32:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-7886178381634f05b302841141382e59, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-7886178381634f05b302841141382e59, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-7886178381634f05b302841141382e59, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName3, flow_id = flow-2c1a23d9587842bebaeee490319de81f
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionC, flow = &{Id:flow-2c1a23d9587842bebaeee490319de81f Name:flowName3 Conf:0xc000092dc0 Funcs:map[dataReuseFunc:0xc000095620 funcName1:0xc0000955c0 funcName3:0xc000095680] FlowHead:0xc0000955c0 FlowTail:0xc000095680 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000095620 ThisFunctionId:func-ef567879d0dd45b287ed709e549e9d32 PrevFunctionId:func-7886178381634f05b302841141382e59 funcParams:map[func-7886178381634f05b302841141382e59:map[] func-cfe66e39aba54ff989d6764cc4edda20:map[] func-ef567879d0dd45b287ed709e549e9d32:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call DataReuseFuncHandler ----
In FuncName = dataReuseFunc, FuncId = func-ef567879d0dd45b287ed709e549e9d32, row = data from funcName[funcName1], index = 0
In FuncName = dataReuseFunc, FuncId = func-ef567879d0dd45b287ed709e549e9d32, row = data from funcName[funcName1], index = 1
In FuncName = dataReuseFunc, FuncId = func-ef567879d0dd45b287ed709e549e9d32, row = data from funcName[funcName1], index = 2
context.Background
 ====> After commitReuseData, flow_name = flowName3, flow_id = flow-2c1a23d9587842bebaeee490319de81f
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-ef567879d0dd45b287ed709e549e9d32:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionC, flow = &{Id:flow-2c1a23d9587842bebaeee490319de81f Name:flowName3 Conf:0xc000092dc0 Funcs:map[dataReuseFunc:0xc000095620 funcName1:0xc0000955c0 funcName3:0xc000095680] FlowHead:0xc0000955c0 FlowTail:0xc000095680 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000095680 ThisFunctionId:func-cfe66e39aba54ff989d6764cc4edda20 PrevFunctionId:func-ef567879d0dd45b287ed709e549e9d32 funcParams:map[func-7886178381634f05b302841141382e59:map[] func-cfe66e39aba54ff989d6764cc4edda20:map[] func-ef567879d0dd45b287ed709e549e9d32:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-ef567879d0dd45b287ed709e549e9d32:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call funcName3Handler ----
In FuncName = funcName3, FuncId = func-cfe66e39aba54ff989d6764cc4edda20, row = data from funcName[funcName1], index = 0
In FuncName = funcName3, FuncId = func-cfe66e39aba54ff989d6764cc4edda20, row = data from funcName[funcName1], index = 1
In FuncName = funcName3, FuncId = func-cfe66e39aba54ff989d6764cc4edda20, row = data from funcName[funcName1], index = 2
--- PASS: TestActionDataReuse (0.02s)
PASS
ok      kis-flow/test   0.523s

经过成果能够看出,在最终的funcName3Handler中得到的数据是funcName1传递下来的数据,中心的ReuseFunction将上层的数据复用到了下一层,变成了FuncName3的数据源。

7.3 Action ForceEntryNext(强制进入下一层)

7.3.1 ForceEntryNext Action特点

目前的KisFlow为,假如当时的Function没有commit数据(本层的成果数据),那么当时的Function完毕后,将不会持续调度下一层Function。 可是有的Flow的流式核算或许需求持续向下履行,哪怕没有数据,所以这儿能够经过ForceEntryNext这个动作来触发。
首要咱们在Action中新增一个ForceEntryNext 特点。

kis-flow/kis/action.go

// Action KisFlow履行流程Actions
type Action struct {
	// DataReuse 是否复用上层Function数据
	DataReuse bool
	// 默许Next()为假如本层Function核算成果为0条数据,之后Function将不会持续履行
	// ForceEntryNext 为疏忽上述默许规则,没有数据强制进入下一层Function
	ForceEntryNext bool
	// Abort 停止Flow的履行
	Abort bool
}
// ActionForceEntryNext 强制进入下一层
func ActionForceEntryNext(act *Action) {
	act.ForceEntryNext = true
}

且提供装备函数ActionForceEntryNext()来修正这个特点状况。

7.3.2 捕获Action

在捕获Action的dealAction()办法中,加上对这个状况的判别,假如被设置,则需求将flow.Abort状况改成false,flow将持续履行下一层。

kis-flow/flow/kis_flow_action.go

// dealAction  处理Action,决议接下来Flow的流程走向
func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {
	// DataReuse Action
	if flow.action.DataReuse {
		if err := flow.commitReuseData(ctx); err != nil {
			return nil, err
		}
	} else {
		if err := flow.commitCurData(ctx); err != nil {
			return nil, err
		}
	}
    // ++++++++++++++++++++++++++++
	// ForceEntryNext Action
	if flow.action.ForceEntryNext {
		if err := flow.commitVoidData(ctx); err != nil {
			return nil, err
		}
		flow.abort = false
	}
	// 更新上一层 FuncitonId 游标
	flow.PrevFunctionId = flow.ThisFunctionId
	fn = fn.Next()
	// Abort Action 强制停止
	if flow.action.Abort {
		flow.abort = true
	}
	// 清空Action
	flow.action = kis.Action{}
	return fn, nil
}

这儿有一个细节,咱们需求调用一个办法commitVoidData(),即提交空数据,原因是,假如不提交空数据,那么flow.buffer依然为空,那么不会履行数据的提交动作,那么会导致flow.data[flow.ThisFunctionId]这条不存在,也便是key不存在,那么再履行到flow.getCurData()会呈现找不到key的异常而panic。所以这儿需求提交一个空的数据到flow.data[flow.ThisFunctionId]中。
详细的commitVoidData()完成如下:

kis-flow/flow/kis_flow_data.go

func (flow *KisFlow) commitVoidData(ctx context.Context) error {
	if len(flow.buffer) != 0 {
		return nil
	}
	// 制造空数据
	batch := make(common.KisRowArr, 0)
	// 将本层核算的缓冲数据提交到本层成果数据中
	flow.data[flow.ThisFunctionId] = batch
	log.Logger().DebugFX(ctx, " ====> After commitVoidData, flow_name = %s, flow_id = %snAll Level Data =n %+vn", flow.Name, flow.Id, flow.data)
	return nil
}

7.3.3 单元测试,不设置ForceEntryNext

首要,创立一个noResultFunc的Function装备,且完成相关的回调事务函数。

kis-flow/test/load_conf/func/func-NoResultFunc.yml

kistype: func
fname: noResultFunc
fmode: Calculate
source:
  name: 用户订单错误率
  must:
    - order_id
    - user_id

kis-flow/test/faas/faas_no_result.go

package faas
import (
	"context"
	"fmt"
	"kis-flow/kis"
)
// type FaaS func(context.Context, Flow) error
func NoResultFuncHandler(ctx context.Context, flow kis.Flow) error {
	fmt.Println("---> Call NoResultFuncHandler ----")
	for _, row := range flow.Input() {
		str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
		fmt.Println(str)
	}
	return flow.Next()
}

这儿边在Function的最终,只调用flow.Next() 不传递任何Action动作。
然后新建一个FlowName4,装备如下:

kis-flow/test/load_conf/flow-FlowName4.yml

kistype: flow
status: 1
flow_name: flowName4
flows:
  - fname: funcName1
  - fname: noResultFunc
  - fname: funcName3

最终咱们编写单元测试用例代码,将noResultFunc放在中心的部分。

kis-flow/test/kis_action_test.go

func TestActionForceEntry(t *testing.T) {
	ctx := context.Background()
	// 0. 注册Function 回调事务
	kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
	kis.Pool().FaaS("noResultFunc", faas.NoResultFuncHandler) // 增加noResultFunc 事务
	kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
	// 0. 注册ConnectorInit 和 Connector 回调事务
	kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
	kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
	// 1. 加载装备文件并构建Flow
	if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
		panic(err)
	}
	// 2. 获取Flow
	flow1 := kis.Pool().GetFlow("flowName4")
	// 3. 提交原始数据
	_ = flow1.CommitRow("This is Data1 from Test")
	_ = flow1.CommitRow("This is Data2 from Test")
	_ = flow1.CommitRow("This is Data3 from Test")
	// 4. 履行flow1
	if err := flow1.Run(ctx); err != nil {
		panic(err)
	}
}

cd到kis-flow/test/ 下履行:

go test -test.v -test.paniconexit0 -test.run  TestActionForceEntry

成果如下:

=== RUN   TestActionForceEntry
Add KisPool FuncName=funcName1
Add KisPool FuncName=noResultFunc
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName1
Add FlowRouter FlowName=flowName2
Add FlowRouter FlowName=flowName3
Add FlowRouter FlowName=flowName4
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
Add FlowRouter FlowName=flowName5
context.Background
====> After CommitSrcData, flow_name = flowName4, flow_id = flow-a496d02c79204e9a803fb5e1307523c9
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]
KisFunctionV, flow = &{Id:flow-a496d02c79204e9a803fb5e1307523c9 Name:flowName4 Conf:0xc000152e40 Funcs:map[funcName1:0xc00011d560 funcName3:0xc00011d620 noResultFunc:0xc00011d5c0] FlowHead:0xc00011d560 FlowTail:0xc00011d620 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00011d560 ThisFunctionId:func-4d113d6a8e744d30a906db310f2d7818 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-47cb6f9ae464484aa779c18284035705:map[] func-4d113d6a8e744d30a906db310f2d7818:map[] func-70011c7ccecf46be91c6993d143639bb:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-4d113d6a8e744d30a906db310f2d7818, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-4d113d6a8e744d30a906db310f2d7818, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-4d113d6a8e744d30a906db310f2d7818, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName4, flow_id = flow-a496d02c79204e9a803fb5e1307523c9
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-4d113d6a8e744d30a906db310f2d7818:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionC, flow = &{Id:flow-a496d02c79204e9a803fb5e1307523c9 Name:flowName4 Conf:0xc000152e40 Funcs:map[funcName1:0xc00011d560 funcName3:0xc00011d620 noResultFunc:0xc00011d5c0] FlowHead:0xc00011d560 FlowTail:0xc00011d620 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00011d5c0 ThisFunctionId:func-47cb6f9ae464484aa779c18284035705 PrevFunctionId:func-4d113d6a8e744d30a906db310f2d7818 funcParams:map[func-47cb6f9ae464484aa779c18284035705:map[] func-4d113d6a8e744d30a906db310f2d7818:map[] func-70011c7ccecf46be91c6993d143639bb:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-4d113d6a8e744d30a906db310f2d7818:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call NoResultFuncHandler ----
In FuncName = noResultFunc, FuncId = func-47cb6f9ae464484aa779c18284035705, row = data from funcName[funcName1], index = 0
In FuncName = noResultFunc, FuncId = func-47cb6f9ae464484aa779c18284035705, row = data from funcName[funcName1], index = 1
In FuncName = noResultFunc, FuncId = func-47cb6f9ae464484aa779c18284035705, row = data from funcName[funcName1], index = 2
--- PASS: TestActionForceEntry (0.02s)
PASS
ok      kis-flow/test   0.958s

由于noResultFunc不会生成任何的成果数据,所以下一层Function将不会被履行,最终只履行到

---> Call NoResultFuncHandler ----

7.3.4 单元测试,设置ForceEntryNext

下面咱们将Action为ForceEntryNext加上,在NoResultFuncHandler() 中,加上flow.Next(kis.ActionForceEntryNext),如下:

kis-flow/test/faas/faas_no_result.go

package faas
import (
	"context"
	"fmt"
	"kis-flow/kis"
)
// type FaaS func(context.Context, Flow) error
func NoResultFuncHandler(ctx context.Context, flow kis.Flow) error {
	fmt.Println("---> Call NoResultFuncHandler ----")
	for _, row := range flow.Input() {
		str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
		fmt.Println(str)
	}
	return flow.Next(kis.ActionForceEntryNext)
}

cd到kis-flow/test/ 下履行:

go test -test.v -test.paniconexit0 -test.run  TestActionForceEntry

成果如下:

=== RUN   TestActionForceEntry
Add KisPool FuncName=funcName1
Add KisPool FuncName=noResultFunc
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName5
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
Add FlowRouter FlowName=flowName1
Add FlowRouter FlowName=flowName2
Add FlowRouter FlowName=flowName3
Add FlowRouter FlowName=flowName4
context.Background
====> After CommitSrcData, flow_name = flowName4, flow_id = flow-7fb47f227c9f4b9d8fa69c28177fc7bb
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]
KisFunctionV, flow = &{Id:flow-7fb47f227c9f4b9d8fa69c28177fc7bb Name:flowName4 Conf:0xc000028e80 Funcs:map[funcName1:0xc0000136e0 funcName3:0xc0000137a0 noResultFunc:0xc000013740] FlowHead:0xc0000136e0 FlowTail:0xc0000137a0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000136e0 ThisFunctionId:func-ecddaee7d7d447a9852d07088732f509 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-5729600ae6ea4d6f879eb5832c638e1a:map[] func-c9817c7993894919b8463dea1757544e:map[] func-ecddaee7d7d447a9852d07088732f509:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-ecddaee7d7d447a9852d07088732f509, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-ecddaee7d7d447a9852d07088732f509, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-ecddaee7d7d447a9852d07088732f509, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName4, flow_id = flow-7fb47f227c9f4b9d8fa69c28177fc7bb
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionC, flow = &{Id:flow-7fb47f227c9f4b9d8fa69c28177fc7bb Name:flowName4 Conf:0xc000028e80 Funcs:map[funcName1:0xc0000136e0 funcName3:0xc0000137a0 noResultFunc:0xc000013740] FlowHead:0xc0000136e0 FlowTail:0xc0000137a0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000013740 ThisFunctionId:func-c9817c7993894919b8463dea1757544e PrevFunctionId:func-ecddaee7d7d447a9852d07088732f509 funcParams:map[func-5729600ae6ea4d6f879eb5832c638e1a:map[] func-c9817c7993894919b8463dea1757544e:map[] func-ecddaee7d7d447a9852d07088732f509:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call NoResultFuncHandler ----
In FuncName = noResultFunc, FuncId = func-c9817c7993894919b8463dea1757544e, row = data from funcName[funcName1], index = 0
In FuncName = noResultFunc, FuncId = func-c9817c7993894919b8463dea1757544e, row = data from funcName[funcName1], index = 1
In FuncName = noResultFunc, FuncId = func-c9817c7993894919b8463dea1757544e, row = data from funcName[funcName1], index = 2
context.Background
 ====> After commitVoidData, flow_name = flowName4, flow_id = flow-7fb47f227c9f4b9d8fa69c28177fc7bb
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c9817c7993894919b8463dea1757544e:[] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionC, flow = &{Id:flow-7fb47f227c9f4b9d8fa69c28177fc7bb Name:flowName4 Conf:0xc000028e80 Funcs:map[funcName1:0xc0000136e0 funcName3:0xc0000137a0 noResultFunc:0xc000013740] FlowHead:0xc0000136e0 FlowTail:0xc0000137a0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000137a0 ThisFunctionId:func-5729600ae6ea4d6f879eb5832c638e1a PrevFunctionId:func-c9817c7993894919b8463dea1757544e funcParams:map[func-5729600ae6ea4d6f879eb5832c638e1a:map[] func-c9817c7993894919b8463dea1757544e:map[] func-ecddaee7d7d447a9852d07088732f509:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c9817c7993894919b8463dea1757544e:[] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call funcName3Handler ----
--- PASS: TestActionForceEntry (0.01s)
PASS
ok      kis-flow/test   0.348s

会发现,Function第三层funcName3Handler 被履行到,可是没有任何的数据。

7.4 Action JumpFunc(流程跳转)

接下来,来完成JumpFunc Action,JumpFunc是能够在当时Flow中恣意跳转到指定的FuncName持续履行(前提是跳转的FuncName当当时Flow中存在)

留意:JumpFunc简单呈现无限循环流,所以在事务的规划要慎用。

7.4.1 Action增加JumpFunc

首要在Action增加一个JumpFunc特点,留意,JunpFunc不是一个bool状况,而是一个string字符串,表明详细要跳转的FunctionName称号。

kis-flow/kis/action.go

// Action KisFlow履行流程Actions
type Action struct {
	// DataReuse 是否复用上层Function数据
	DataReuse bool
	// 默许Next()为假如本层Function核算成果为0条数据,之后Function将不会持续履行
	// ForceEntryNext 为疏忽上述默许规则,没有数据强制进入下一层Function
	ForceEntryNext bool
    // ++++++++++
	// JumpFunc 跳转到指定Function持续履行
	JumpFunc string
	// Abort 停止Flow的履行
	Abort bool
}
// ActionJumpFunc 会回来一个ActionFunc函数,而且会将funcName赋值给Action.JumpFunc
// (留意:简单呈现Flow循环调用,导致死循环)
func ActionJumpFunc(funcName string) ActionFunc {
	return func(act *Action) {
		act.JumpFunc = funcName
	}
}

然后提供一个修正JumpFunc的装备办法ActionJumpFunc(),留意这个办法和之前的办法写法有一些不同,主要是回来一个匿名函数而且履行,意图则是修正Action中的JumpFunc特点。

7.4.2 捕获Action

接下来,咱们来捕获JumpFunc的Action动作,判别JumpFunc是否为空字符串即可。

kis-flow/flow/kis_flow_action.go

// dealAction  处理Action,决议接下来Flow的流程走向
func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {
	// DataReuse Action
	if flow.action.DataReuse {
		if err := flow.commitReuseData(ctx); err != nil {
			return nil, err
		}
	} else {
		if err := flow.commitCurData(ctx); err != nil {
			return nil, err
		}
	}
	// ForceEntryNext Action
	if flow.action.ForceEntryNext {
		if err := flow.commitVoidData(ctx); err != nil {
			return nil, err
		}
		flow.abort = false
	}
    // ++++++++++++++++++++++++++++++++
	// JumpFunc Action
	if flow.action.JumpFunc != "" {
		if _, ok := flow.Funcs[flow.action.JumpFunc]; !ok {
			//当时JumpFunc不在flow中
			return nil, errors.New(fmt.Sprintf("Flow Jump -> %s is not in Flow", flow.action.JumpFunc))
		}
		jumpFunction := flow.Funcs[flow.action.JumpFunc]
		// 更新上层Function
		flow.PrevFunctionId = jumpFunction.GetPrevId()
		fn = jumpFunction
		// 假如设置跳动,强制跳动
		flow.abort = false
    // ++++++++++++++++++++++++++++++++
	} else {
		// 更新上一层 FuncitonId 游标
		flow.PrevFunctionId = flow.ThisFunctionId
		fn = fn.Next()
	}
	// Abort Action 强制停止
	if flow.action.Abort {
		flow.abort = true
	}
	// 清空Action
	flow.action = kis.Action{}
	return fn, nil
}

假如设置JumpFunc,则需求修正下次履行的fn指针,否则则正常寻址fn.Next()

7.4.3 单元测试

接下来来界说一个跳转Action的Function,装备,如下:

kis-flow/test/load_conf/func/func-jumpFunc.yml

kistype: func
fname: jumpFunc
fmode: Calculate
source:
  name: 用户订单错误率
  must:
    - order_id
    - user_id

而且完成相关的Funciton事务逻辑,如下:

kis-flow/test/faas/faas_jump.go

package faas
import (
	"context"
	"fmt"
	"kis-flow/kis"
)
// type FaaS func(context.Context, Flow) error
func JumpFuncHandler(ctx context.Context, flow kis.Flow) error {
	fmt.Println("---> Call JumpFuncHandler ----")
	for _, row := range flow.Input() {
		str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
		fmt.Println(str)
	}
	return flow.Next(kis.ActionJumpFunc("funcName1"))
}

这儿,最终经过flow.Next(kis.ActionJumpFunc("funcName1"))来指定跳转到funcName1的Function。

新建一个Flow,为FlowName5,装备如下:

kis-flow/test/load_conf/flow/flow-FlowName5.yml

kistype: flow
status: 1
flow_name: flowName5
flows:
  - fname: funcName1
  - fname: funcName2
  - fname: jumpFunc

之后,来完成单元测试用例代码,如下:

kis-flow/test/kis_action_test.go

func TestActionJumpFunc(t *testing.T) {
	ctx := context.Background()
	// 0. 注册Function 回调事务
	kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
	kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
	kis.Pool().FaaS("jumpFunc", faas.JumpFuncHandler) // 增加jumpFunc 事务
	// 0. 注册ConnectorInit 和 Connector 回调事务
	kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
	kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
	// 1. 加载装备文件并构建Flow
	if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
		panic(err)
	}
	// 2. 获取Flow
	flow1 := kis.Pool().GetFlow("flowName5")
	// 3. 提交原始数据
	_ = flow1.CommitRow("This is Data1 from Test")
	_ = flow1.CommitRow("This is Data2 from Test")
	_ = flow1.CommitRow("This is Data3 from Test")
	// 4. 履行flow1
	if err := flow1.Run(ctx); err != nil {
		panic(err)
	}
}

cd到kis-flow/test/履行:

go test -test.v -test.paniconexit0 -test.run  TestActionJumpFunc

成果如下:

...
...
---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-f6ca8010d66744429bf6069c9897a928, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-f6ca8010d66744429bf6069c9897a928, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-f6ca8010d66744429bf6069c9897a928, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName5, flow_id = flow-5da80af989dc49648a001762fa08b866
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionS, flow = &{Id:flow-5da80af989dc49648a001762fa08b866 Name:flowName5 Conf:0xc000028f80 Funcs:map[funcName1:0xc000013620 funcName2:0xc000013680 jumpFunc:0xc0000136e0] FlowHead:0xc000013620 FlowTail:0xc0000136e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000013680 ThisFunctionId:func-5800567c4cd842b6b377c2b0c0fd81c2 PrevFunctionId:func-f6ca8010d66744429bf6069c9897a928 funcParams:map[func-4faf8f019f4a4a48b84ef27abfad53d1:map[] func-5800567c4cd842b6b377c2b0c0fd81c2:map[] func-f6ca8010d66744429bf6069c9897a928:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call funcName2Handler ----
In FuncName = funcName2, FuncId = func-5800567c4cd842b6b377c2b0c0fd81c2, row = data from funcName[funcName1], index = 0
===> In CaasDemoHanler1: flowName: flowName5, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 0
In FuncName = funcName2, FuncId = func-5800567c4cd842b6b377c2b0c0fd81c2, row = data from funcName[funcName1], index = 1
===> In CaasDemoHanler1: flowName: flowName5, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 1
In FuncName = funcName2, FuncId = func-5800567c4cd842b6b377c2b0c0fd81c2, row = data from funcName[funcName1], index = 2
===> In CaasDemoHanler1: flowName: flowName5, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 2
context.Background
 ====> After commitCurData, flow_name = flowName5, flow_id = flow-5da80af989dc49648a001762fa08b866
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionC, flow = &{Id:flow-5da80af989dc49648a001762fa08b866 Name:flowName5 Conf:0xc000028f80 Funcs:map[funcName1:0xc000013620 funcName2:0xc000013680 jumpFunc:0xc0000136e0] FlowHead:0xc000013620 FlowTail:0xc0000136e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000136e0 ThisFunctionId:func-4faf8f019f4a4a48b84ef27abfad53d1 PrevFunctionId:func-5800567c4cd842b6b377c2b0c0fd81c2 funcParams:map[func-4faf8f019f4a4a48b84ef27abfad53d1:map[] func-5800567c4cd842b6b377c2b0c0fd81c2:map[] func-f6ca8010d66744429bf6069c9897a928:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call JumpFuncHandler ----
In FuncName = jumpFunc, FuncId = func-4faf8f019f4a4a48b84ef27abfad53d1, row = data from funcName[funcName2], index = 0
In FuncName = jumpFunc, FuncId = func-4faf8f019f4a4a48b84ef27abfad53d1, row = data from funcName[funcName2], index = 1
In FuncName = jumpFunc, FuncId = func-4faf8f019f4a4a48b84ef27abfad53d1, row = data from funcName[funcName2], index = 2
KisFunctionV, flow = &{Id:flow-5da80af989dc49648a001762fa08b866 Name:flowName5 Conf:0xc000028f80 Funcs:map[funcName1:0xc000013620 funcName2:0xc000013680 jumpFunc:0xc0000136e0] FlowHead:0xc000013620 FlowTail:0xc0000136e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000013620 ThisFunctionId:func-f6ca8010d66744429bf6069c9897a928 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-4faf8f019f4a4a48b84ef27abfad53d1:map[] func-5800567c4cd842b6b377c2b0c0fd81c2:map[] func-f6ca8010d66744429bf6069c9897a928:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call funcName1Handler ----
 ... 
 ...

发现咱们会无限循环的调度Flow,这样阐明咱们的JumpFunc Action现已生效。

7.5【V0.6】源代码

github.com/aceld/kis-f…


作者:刘丹冰Aceld github: github.com/aceld

KisFlow开源项目地址:github.com/aceld/kis-f…

Golang结构实战-KisFlow流式核算结构专栏

Golang结构实战-KisFlow流式核算结构(1)-概述

Golang结构实战-KisFlow流式核算结构(2)-项目构建/根底模块-(上)

Golang结构实战-KisFlow流式核算结构(3)-项目构建/根底模块-(下)

Golang结构实战-KisFlow流式核算结构(4)-数据流

Golang结构实战-KisFlow流式核算结构(5)-Function调度

Golang结构实战-KisFlow流式核算结构(6)-Connector

Golang结构实战-KisFlow流式核算结构(7)-装备导入与导出