# 解析节点

在前面章节,我们降到了流程定义,即工作流的JSON数据结构:

工作流json定义结构

{
    "id": "xxx",
    "name": "Unknown",
    "description": "",
    "startNodeInstId": "",
    "endNodeInstId": "",
    "engine": "fast",
    "status": "",
    "creator": "admin",
    "executor": "",
    "mode": "fast",
    "inputDataSchema": {},
    "outputDataSchema": {},
    "nodes": [
        {
            "instId": "r0wrc5a395",
            "name": "Start",
            "type": "Event",
            "description": "",
            "template": "start-trigger",
            "positions": [
                1599,
                248
            ],
            "connections": [],
            "parameters": {},
            "errorHandler": {},
            "loopHandler": {},
            "isIgnore": false,
            "timeout": 0,
            "runtimes": [
                {
                    "index": 0,
                    "input": {},
                    "output": "",
                    "startAt": "",
                    "endAt": "",
                    "error": "",
                    "status": "todo"
                }
            ],
        }
    ]
}

WorkflowInst(工作流实例)数据结构

工作流JSON在引擎运行解析后对应的内存数据结构如下:

type WorkflowInst struct {
	ExecutionId    string     `json:"executionUid"`
	WorkflowId     int64      `json:"workflowId"`
	Name           string     `json:"name"`
	Descriptioin           string     `json:"descriptioin"`
	StartNodeInstId string     `json:"startNodeInstId"`
	EndNodeInstId   string     `json:"endNodeInstId"`
	Engine         string     `json:"engine"`
	Status         string     `json:"status"`
	Creator           string     `json:"creator"`
	Executor       string     `json:"executor"`
  Mode           string     `json:"mode"`
	InputDataSchema    map[string]DataSchema     `json:"inputDataSchema"`
  OutputDataSchema    map[string]DataSchema     `json:"outputDataSchema"`
	Tasks           []*Task `json:"tasks"`
}


type DataSchema struct {
  Type    string     `json:"type"`
  Max    int64     `json:"max"`
  Min    int64     `json:"min"`
  Default    interface{}     `json:"default"`
  Required    bool     `json:"required"`
}

Node的json定义结构

{
    "instId": "r0wrc5a395",
    "name": "Start",
    "type": "Event",
    "description": "",
    "template": "start-trigger",
    "positions": [
        1599,
        248
    ],
    "connections": [],
    "parameters": {},
    "errorHandler": {},
    "loopHandler": {},
    "isIgnore": false,
    "timeout": 0,
    "runtimes": [
        {
            "index": 0,
            "input": {},
            "output": "",
            "startAt": "",
            "endAt": "",
            "error": "",
            "status": "todo"
        }
    ],
}

引擎在运行工作流实例获取到流程定义结构时,会将流程实例中每个Node节点解析映射到如下Task数据结构中。

Node结构体

// go代码
type Node struct {
	InstId    string `json:"instId"`
	Name      string `json:"name"`
  Type      string `json:"type"`
	Description      string `json:"description"`
	Template              string  `json:"template"`
	Positions             []float64              `json:"positions"`
	Connections           []string               `json:"connections"`
	Parameters            map[string]Parameter `json:"parameters"` //运行前的参数
	ErrorHandler          ErrorHandler           `json:"errorHandler"`
	LoopHandler           LoopHandler            `json:"loopHandler"`
	IsIgnore              bool                   `json:"isIgnore"`
  Timeout int64 `json:"timeout"`
	Runtimes       []NodeInst `json:"runtimes"`       // 运行时的参数
}

type Parameter struct {
  Type        string     `json:"type"`
  Label       string     `json:"label"`
  Value       interface{}     `json:"value"`
  Default     interface{}     `json:"default"`
  Required    bool     `json:"required"`
}

NodeInst结构体

// go代码
type NodeInst struct {
  Index int64 `json:"index"`
	Input map[string]interface{} `json:"input"` //对应运行时的Task参数(运行后的,表达式会被替换成实际的数据)
	Output     []interface{}          `json:"output"`
	StartAt    string                 `json:"startAt"`
	EndAt      string                 `json:"endAt"`
	Error      string                 `json:"error"`
	Status     string                 `json:"status"`
}

# 解析节点流转关系

根据流程的定义,我们知道节点中connections字段是用于存储当前节点指向的下一批节点列表。

引擎在执行完节点实例后,就会connections字段获取下一批要执行的节点任务实例,并推送到消息队列中。

但是这里有两个个数据流控制节点比较特殊,它不是通过connections来获取下一个要执行的节点:

  • 条件判断节点

    我们看【流程定义中的主要节点类型】,知道条件判断节点的json数据结构如下:

    {
        "condition_list": [],
        "relation": "or/and",
        "false_branch": "xxx1",
        "true_branch": "xxx2"
    }
    

    引擎在执行条件判断节点时,会根据节点输出结果进行判断,如果是true,则读取true_branch分支指向的节点;如果是false,则读取false_branch分支指向的节点。然后推送到消息队列继续执行。

  • 人工任务中的审批节点

    审批节点的json结构如下,其中有三个属性是跟数据流转有关的,分别是:true_branch、false_branch和timeout_branch。

    审批节点在结束时,有两种情况。

    • 超时:如果审批超时,这时引擎就会读取timeout_branch分支指向的节点继续执行。
    • 正常结束:如果审批结束,引擎会根据审批的结果,如果通过,则读取true_branch分支指向的节点;如果不通过,则读取false_branch分支指向的节点。然后推送到消息队列继续执行。
    {
        "members": [],
        "title": "",
        "content": "hello world",
        "attr": "会签/或签",
        "timeout": 0,
        "aggree_btn": "同意",
        "disaggree_btn": "驳回",
        
        "true_branch": "",
        "false_branch": "",
        "timeout_branch": ""
    }
    

所以,节点流转关系这里除了上面的条件判断和人工审批节点比较特殊外。其逻辑都是一样的,引擎通过读取connections字段获取下一个节点。

# 解析异常处理器

异常处理器分两部分:

  • 一个是节点自己的异常处理,也就是局部的异常处理器
  • 一个是整个工作流实例的异常处理,也就是全局的异常处理器

# 局部异常处理

每个节点都会拥有一个异常处理器,其JSON结构如下。

{
    "operation": "stop",
    "retryCount": 0,
    "retryInterval": 0,
    "exception_branch":"xxx"
}

其中operation字段支持的动作包括:

  • stop:发生错误,则直接停止往下执行
  • ignore:发生错误,忽略,继续往下执行
  • catch:发生错误,捕获并响应,执行异常分支的流程(exception_branch分支指向的节点)
  • retry:发生错误,重试retryCount次,每次之间间隔是retryInterval秒

# 全局异常处理

func (Engine *v)Run() (err error) {
  defer func(){
    if e := recover(); e != nil {
      // 捕获全局异常
    }
  }
  
  // 引擎执行节点任务实例
  isFinished, output, execErr = execApp.Run()

  // 执行出现异常
  if execErr != nil {
    if todoNodeInst.ErrorHandler.Operation == "ignore" {
      task.NodeInstIdToData[todoNodeInst.InstId].Error = execErr.Error()
      task.NodeInstIdToData[todoNodeInst.InstId].Status = utils.ExecutionStatus_Error


      execErr = nil
    } else if todoNodeInst.ErrorHandler.Operation == "retry" {
      for i := 0; i < todoNodeInst.ErrorHandler.RetryCount; i++ {
        isFinished, output, execErr = execApp.Run()

        if execErr == nil {
          break
        } else {
          // sleep间隔
          time.Sleep(time.Duration(todoNodeInst.ErrorHandler.RetryInterval) * time.Second)
        }
      }
    } else if todoNodeInst.ErrorHandler.Operation == "throw" {
      // 向上抛出异常
    }
  }
}

# 解析循环处理器

前面介绍流程定义中的主要节点,我们知道,这个循环的属性是对***子流程节点***而言的。在前面的内容,我们介绍了子流程节点的原理和实现。

{
   // 子流程设置
    "child_workflow_id": 0,
    "operation": "sync/async",
    // 循环属性设置
    "loopType": "for",
    "maxIteration": 1000,
    "stopCond": {
        "index": 1,
        "type": "String",
        "value1": "",
        "operation": "equal",
        "value2": ""
    }
}

这里我们,再展开讲下引擎具体的解析过程:

// 以下是golang代码部分实现
// todoNodeInst就是当前在执行的子流程节点实例

.....
// Step1: 执行当前节点实例
isFinished, output, execErr = engine.Run(todoNodeInst)
.....

// Step2: 执行完以后,取下一个要执行的节点,默认情况下,会直接取下一个节点实例ID
nextNodeInstIds = todoNodeInst.Connections
.....

// Step3.1 for循环方式处理
if todoNodeInst.LoopHandler.LoopType == "for" {
    // 获取当前循环了几次
		curLoop = models.GetLoopNodeInstCount(executionId, todoNodeInst.InstId)

		if curLoop <= todoNodeInst.LoopHandler.MaxIteration {
			// 继续循环当前appinst
			nextNodeInstIds = []string{todoNodeInst.InstId}
		} else {
			// 循环结束
      // 这时候nextNodeInstIds就是子流程节点指向的下一个节点实例
      // 即代码部分:nextNodeInstIds = todoNodeInst.Connections
		}
} else if todoNodeInst.LoopHandler.LoopType == "do_while" {
  // Step3.2 do..while循环方式处理
  
  // 获取当前循环了几次
	curLoop = models.GetLoopAppInstCount(executionId, todoNodeInst.InstId)
  
  if isDoWhileConditionMatched(todoNodeInst) || curLoop > todoNodeInst.LoopHandler.MaxIteration {
    // 条件满足或超过最大循环次数,退出do...while循环
  } else {
    // 条件不满足,继续执行当前子流程节点实例
    nextNodeInstIds = []string{todoNodeInst.InstId}
  }
}
.....

// 把下一批要执行的节点实例推送到消息队列
pushAppInstToTaskQueue(nextNodeInstIds)
.....

# 解析变量表达式

在流程定义部分,我们知道,流程中每一个节点都有一个唯一的ID来标识自己。通过这个节点ID就可以快速定位到流程中的位置。

变量表达式如前面介绍,除了局部变量和全局变量,还有一种就是跟节点ID关联的变量表达式。通过这个变量表达式,可以快速定位获取节点ID在运行时的入参和出参数据。

  • 入参、出参变量表达式

入参变量表达式

{{appinst_id.0.input.field}}

出参变量表达式

{{appinst_id.0.output.field}}

由前面内容,我们知道引擎每次在执行完工作流实例中的节点实例任务后,都会把这个节点实例的运行时数据(运行时的入参和出参)更新到对应的NodeInst结构体中。通过访问这个数据结构,就可以动态获取到节点实例的运行时数据。

所以,引擎为每一个运行中的工作流实例,都维护了一个节点实例ID跟运行时数据的映射,其结构体如下:

// go代码
var InstIdToNodeInst   map[string][]NodeInst

将上面的InstIdToNodeInst变量数据Json话,得到的数据结构如下:

{
  "task_inst_id_1":[{"index":0,"input":{},"output":"","startAt":"","endAt":"","error":"","status":"done"},...],
  "task_inst_id_2":[{"index":0,"input":{},"output":"","startAt":"","endAt":"","error":"","status":"done"},...],
  "task_inst_id_3":[{"index":0,"input":{},"output":"","startAt":"","endAt":"","error":"","status":"done"},...],
}

我们可以发现入参变量表达式和出参变量表达式,跟InstIdToNodeInst数据结构是一一对应的层级关系。通过这样设计数据结构,引擎就可以快速动态解析入参和出参变量表达式

  • 局部变量

局部变量表达式语法结构如下:

{{variable.local.key}}

在前面的【流程定义中的主要节点类型】,我们介绍了变量读写任务节点的原理和实现方式。

那么就可以很容易知道,引擎在解析到局部变量表达式后,同时获取到了当前工作流实例ID(execution_id)和局部变量名称(key)。

那么,通过Redis的GET命令,引擎就可以快速读取到当前局部变量此时此刻运行时的数据:

GET execution_id#key
  • 全局变量

全局变量表达式语法结构如下:

{{variable.global.key}}

同理,引擎在解析到全局变量表达式时,可以同时获取到当前工作流的创建人(user)和全局变量名称(key)。

结合Redis的GET命令,引擎就可以获取到全局变量表达式运行时数据:

GET user#key
最后更新: 9/12/2024, 11:20:51 PM