# 解析节点
在前面章节,我们降到了流程定义,即工作流的JSON数据结构:
工作流json定义结构
{
"id": "xxx",
"name": "Unknown",
"description": "",
"startNodeInstId": "",
"endNodeInstId": "",
"engine": "fast",
"status": "",
"creator": "admin",
"executor": "",
"mode": "fast",
"inputDataSchema": {},
"outputDataSchema": {},
"nodes": [
{
"instId": "r0wrc5a395",
"name": "Start",
"type": "Event",
"description": "",
"template": "start-trigger",
"positions": [
1599,
248
],
"connections": [],
"parameters": {},
"errorHandler": {},
"loopHandler": {},
"isIgnore": false,
"timeout": 0,
"runtimes": [
{
"index": 0,
"input": {},
"output": "",
"startAt": "",
"endAt": "",
"error": "",
"status": "todo"
}
],
}
]
}
WorkflowInst(工作流实例)数据结构
工作流JSON在引擎运行解析后对应的内存数据结构如下:
type WorkflowInst struct {
ExecutionId string `json:"executionUid"`
WorkflowId int64 `json:"workflowId"`
Name string `json:"name"`
Descriptioin string `json:"descriptioin"`
StartNodeInstId string `json:"startNodeInstId"`
EndNodeInstId string `json:"endNodeInstId"`
Engine string `json:"engine"`
Status string `json:"status"`
Creator string `json:"creator"`
Executor string `json:"executor"`
Mode string `json:"mode"`
InputDataSchema map[string]DataSchema `json:"inputDataSchema"`
OutputDataSchema map[string]DataSchema `json:"outputDataSchema"`
Tasks []*Task `json:"tasks"`
}
type DataSchema struct {
Type string `json:"type"`
Max int64 `json:"max"`
Min int64 `json:"min"`
Default interface{} `json:"default"`
Required bool `json:"required"`
}
Node的json定义结构
{
"instId": "r0wrc5a395",
"name": "Start",
"type": "Event",
"description": "",
"template": "start-trigger",
"positions": [
1599,
248
],
"connections": [],
"parameters": {},
"errorHandler": {},
"loopHandler": {},
"isIgnore": false,
"timeout": 0,
"runtimes": [
{
"index": 0,
"input": {},
"output": "",
"startAt": "",
"endAt": "",
"error": "",
"status": "todo"
}
],
}
引擎在运行工作流实例获取到流程定义结构时,会将流程实例中每个Node节点解析映射到如下Task数据结构中。
Node结构体
// go代码
type Node struct {
InstId string `json:"instId"`
Name string `json:"name"`
Type string `json:"type"`
Description string `json:"description"`
Template string `json:"template"`
Positions []float64 `json:"positions"`
Connections []string `json:"connections"`
Parameters map[string]Parameter `json:"parameters"` //运行前的参数
ErrorHandler ErrorHandler `json:"errorHandler"`
LoopHandler LoopHandler `json:"loopHandler"`
IsIgnore bool `json:"isIgnore"`
Timeout int64 `json:"timeout"`
Runtimes []NodeInst `json:"runtimes"` // 运行时的参数
}
type Parameter struct {
Type string `json:"type"`
Label string `json:"label"`
Value interface{} `json:"value"`
Default interface{} `json:"default"`
Required bool `json:"required"`
}
NodeInst结构体
// go代码
type NodeInst struct {
Index int64 `json:"index"`
Input map[string]interface{} `json:"input"` //对应运行时的Task参数(运行后的,表达式会被替换成实际的数据)
Output []interface{} `json:"output"`
StartAt string `json:"startAt"`
EndAt string `json:"endAt"`
Error string `json:"error"`
Status string `json:"status"`
}
# 解析节点流转关系
根据流程的定义,我们知道节点中connections字段是用于存储当前节点指向的下一批节点列表。
引擎在执行完节点实例后,就会connections字段获取下一批要执行的节点任务实例,并推送到消息队列中。
但是这里有两个个数据流控制节点比较特殊,它不是通过connections来获取下一个要执行的节点:
条件判断节点
我们看【流程定义中的主要节点类型】,知道条件判断节点的json数据结构如下:
{ "condition_list": [], "relation": "or/and", "false_branch": "xxx1", "true_branch": "xxx2" }
引擎在执行条件判断节点时,会根据节点输出结果进行判断,如果是true,则读取true_branch分支指向的节点;如果是false,则读取false_branch分支指向的节点。然后推送到消息队列继续执行。
人工任务中的审批节点
审批节点的json结构如下,其中有三个属性是跟数据流转有关的,分别是:true_branch、false_branch和timeout_branch。
审批节点在结束时,有两种情况。
- 超时:如果审批超时,这时引擎就会读取timeout_branch分支指向的节点继续执行。
- 正常结束:如果审批结束,引擎会根据审批的结果,如果通过,则读取true_branch分支指向的节点;如果不通过,则读取false_branch分支指向的节点。然后推送到消息队列继续执行。
{ "members": [], "title": "", "content": "hello world", "attr": "会签/或签", "timeout": 0, "aggree_btn": "同意", "disaggree_btn": "驳回", "true_branch": "", "false_branch": "", "timeout_branch": "" }
所以,节点流转关系这里除了上面的条件判断和人工审批节点比较特殊外。其逻辑都是一样的,引擎通过读取connections字段获取下一个节点。
# 解析异常处理器
异常处理器分两部分:
- 一个是节点自己的异常处理,也就是局部的异常处理器
- 一个是整个工作流实例的异常处理,也就是全局的异常处理器
# 局部异常处理
每个节点都会拥有一个异常处理器,其JSON结构如下。
{
"operation": "stop",
"retryCount": 0,
"retryInterval": 0,
"exception_branch":"xxx"
}
其中operation字段支持的动作包括:
- stop:发生错误,则直接停止往下执行
- ignore:发生错误,忽略,继续往下执行
- catch:发生错误,捕获并响应,执行异常分支的流程(exception_branch分支指向的节点)
- retry:发生错误,重试retryCount次,每次之间间隔是retryInterval秒
# 全局异常处理
func (Engine *v)Run() (err error) {
defer func(){
if e := recover(); e != nil {
// 捕获全局异常
}
}
// 引擎执行节点任务实例
isFinished, output, execErr = execApp.Run()
// 执行出现异常
if execErr != nil {
if todoNodeInst.ErrorHandler.Operation == "ignore" {
task.NodeInstIdToData[todoNodeInst.InstId].Error = execErr.Error()
task.NodeInstIdToData[todoNodeInst.InstId].Status = utils.ExecutionStatus_Error
execErr = nil
} else if todoNodeInst.ErrorHandler.Operation == "retry" {
for i := 0; i < todoNodeInst.ErrorHandler.RetryCount; i++ {
isFinished, output, execErr = execApp.Run()
if execErr == nil {
break
} else {
// sleep间隔
time.Sleep(time.Duration(todoNodeInst.ErrorHandler.RetryInterval) * time.Second)
}
}
} else if todoNodeInst.ErrorHandler.Operation == "throw" {
// 向上抛出异常
}
}
}
# 解析循环处理器
前面介绍流程定义中的主要节点,我们知道,这个循环的属性是对***子流程节点***而言的。在前面的内容,我们介绍了子流程节点的原理和实现。
{
// 子流程设置
"child_workflow_id": 0,
"operation": "sync/async",
// 循环属性设置
"loopType": "for",
"maxIteration": 1000,
"stopCond": {
"index": 1,
"type": "String",
"value1": "",
"operation": "equal",
"value2": ""
}
}
这里我们,再展开讲下引擎具体的解析过程:
// 以下是golang代码部分实现
// todoNodeInst就是当前在执行的子流程节点实例
.....
// Step1: 执行当前节点实例
isFinished, output, execErr = engine.Run(todoNodeInst)
.....
// Step2: 执行完以后,取下一个要执行的节点,默认情况下,会直接取下一个节点实例ID
nextNodeInstIds = todoNodeInst.Connections
.....
// Step3.1 for循环方式处理
if todoNodeInst.LoopHandler.LoopType == "for" {
// 获取当前循环了几次
curLoop = models.GetLoopNodeInstCount(executionId, todoNodeInst.InstId)
if curLoop <= todoNodeInst.LoopHandler.MaxIteration {
// 继续循环当前appinst
nextNodeInstIds = []string{todoNodeInst.InstId}
} else {
// 循环结束
// 这时候nextNodeInstIds就是子流程节点指向的下一个节点实例
// 即代码部分:nextNodeInstIds = todoNodeInst.Connections
}
} else if todoNodeInst.LoopHandler.LoopType == "do_while" {
// Step3.2 do..while循环方式处理
// 获取当前循环了几次
curLoop = models.GetLoopAppInstCount(executionId, todoNodeInst.InstId)
if isDoWhileConditionMatched(todoNodeInst) || curLoop > todoNodeInst.LoopHandler.MaxIteration {
// 条件满足或超过最大循环次数,退出do...while循环
} else {
// 条件不满足,继续执行当前子流程节点实例
nextNodeInstIds = []string{todoNodeInst.InstId}
}
}
.....
// 把下一批要执行的节点实例推送到消息队列
pushAppInstToTaskQueue(nextNodeInstIds)
.....
# 解析变量表达式
在流程定义部分,我们知道,流程中每一个节点都有一个唯一的ID来标识自己。通过这个节点ID就可以快速定位到流程中的位置。
变量表达式如前面介绍,除了局部变量和全局变量,还有一种就是跟节点ID关联的变量表达式。通过这个变量表达式,可以快速定位获取节点ID在运行时的入参和出参数据。
- 入参、出参变量表达式
入参变量表达式
{{appinst_id.0.input.field}}
出参变量表达式
{{appinst_id.0.output.field}}
由前面内容,我们知道引擎每次在执行完工作流实例中的节点实例任务后,都会把这个节点实例的运行时数据(运行时的入参和出参)更新到对应的NodeInst结构体中。通过访问这个数据结构,就可以动态获取到节点实例的运行时数据。
所以,引擎为每一个运行中的工作流实例,都维护了一个节点实例ID跟运行时数据的映射,其结构体如下:
// go代码
var InstIdToNodeInst map[string][]NodeInst
将上面的InstIdToNodeInst变量数据Json话,得到的数据结构如下:
{
"task_inst_id_1":[{"index":0,"input":{},"output":"","startAt":"","endAt":"","error":"","status":"done"},...],
"task_inst_id_2":[{"index":0,"input":{},"output":"","startAt":"","endAt":"","error":"","status":"done"},...],
"task_inst_id_3":[{"index":0,"input":{},"output":"","startAt":"","endAt":"","error":"","status":"done"},...],
}
我们可以发现入参变量表达式和出参变量表达式,跟InstIdToNodeInst数据结构是一一对应的层级关系。通过这样设计数据结构,引擎就可以快速动态解析入参和出参变量表达式
- 局部变量
局部变量表达式语法结构如下:
{{variable.local.key}}
在前面的【流程定义中的主要节点类型】,我们介绍了变量读写任务节点的原理和实现方式。
那么就可以很容易知道,引擎在解析到局部变量表达式后,同时获取到了当前工作流实例ID(execution_id)和局部变量名称(key)。
那么,通过Redis的GET命令,引擎就可以快速读取到当前局部变量此时此刻运行时的数据:
GET execution_id#key
- 全局变量
全局变量表达式语法结构如下:
{{variable.global.key}}
同理,引擎在解析到全局变量表达式时,可以同时获取到当前工作流的创建人(user)和全局变量名称(key)。
结合Redis的GET命令,引擎就可以获取到全局变量表达式运行时数据:
GET user#key
← 3.3 生命周期 3.5 与BPMN的比较 →