网关节点决定流程的流转方向,本文会介绍几个最常用的网关界定及其结构定义。

基于Petri网分

AND Split 和 AND Join

OR Split 和 OR Join

XOR Split 和 XOR Join

AND表示后续的任务节点同时触发

OR表示后续任务节点可以触发一个或多个,没有排他性,即每个节点任务不会影响其他的节点任务。

XOR表示后续的任务节点只触发一个,有排他性,也就是只能触发其中一个。

基于路由分

顺序路由

分支路由

并行路由

循环路由

# 并行节点

原理

并行节点指向的几条分支节点,可以并行地运行。

如下图,token的流向,在进入并行节点出来后,会根据并行节点的后置节点个数,复制出同样个数的token给下一个节点,这样下一个节点拿到token后就可以执行。

image-20231025091638182
{
    "instId": "xxx",
    "name": "并行",
    "type": "Gateway",
    "description": "并行",
    "template": "parallel",
    "positions": [],
    "connections": ["节点1", "节点2"],
    "parameters": {
    },
    "errorHandler": {},
    "loopHandler": {},
    "timeout": 0,
    "isIgnore": false,
    "runtimes": []
}

并行节点的定义很简单,在connections里的数组成员即是并行的节点个数,一般跟汇聚Join节点搭配使用,其原理类似go代码里并行之后的wait行为。

func main() {
    wg := sync.WaitGroup{}
    wg.Add(100)
    for i := 0; i < 100; i++ {
        go func(i int) { // 并行节点
            fmt.Println(i)
            wg.Done()
        }(i)
    }
    wg.Wait() // 类似汇聚join节点
}

实现

前面,我们提到,工作流实例中,最基础的执行单元是节点任务实例,它的执行是基于消息队列实现。

在执行完并行节点后,引擎获取到并行节点的下一个节点(>=1个),然后同时将这些后置节点一起推送到消息队列。后端的worker集群快速消费获取到节点任务实例,由于后端的worker集群是多机器的,所以在用户看来B、C节点的执行就是同时的。

# 条件判断节点

原理

条件判断节点完整JSON结构如下:

{
    "instId": "xxx",
    "name": "条件判断",
    "type": "Gateway",
    "description": "条件判断",
    "template": "condition",
    "positions": [],
    "connections": [],
    "parameters": {
        "condition_list": {
            "label": "condition_list",
            "type": "array",
            "value": [
                {
                    "index": 1,
                    "type": "string/number",
                    "value1": "aaa",
                    "operation": "equal/not_equal/contain/not_contain/start_with/not_start_with/end_with/not_end_with/regex",
                    "value2": "bbb"
                }
            ],
            "default": [],
            "required": true
        },
        "relation": {
            "label": "relation",
            "type": "string",
            "value": "or/and",
            "default": "or",
            "required": true
        },
        "false_branch": {
            "label": "false_branch",
            "type": "string",
            "value": "",
            "default": "",
            "required": false
        },
        "true_branch": {
            "label": "true_branch",
            "type": "string",
            "value": "",
            "default": "",
            "required": false
        }
    },
    "errorHandler": {},
    "loopHandler": {},
    "timeout": 0,
    "isIgnore": false,
    "runtimes": []
}
  • condition_list

condition_list是由一系列的条件组合合成。每个条件的字段功能描述如下:

(1)index:表示条件数组中第几个成员

(2)type:条件中对比的数据类型,一般包括字符串(string)和数字(number)

(3)operation:两个要对比的数据采用哪种对比关系。

对于字符串(string)类型的数据,value1和value2其对比关系如下:

a. equal:等于

b. not_equal:不等于

c. contain:包含

d. not_contain:不包含

e. start_with:以什么开头

f. not_start_with:不以什么开头

g. end_with:以什么结尾

h. not_end_with:不以什么结尾

i. regex:其中value1为正则表达式,value2为被判断是否符合value1的正则表达式

对于数字(number)类型的数据,value1和value2其对比关系如下:

j. gt:大于

k. gte:大于等于

l. lt:小于

n. lte:小于等于

o. equal:等于

p. not_equal:不等于

q. value1和value2:要比较的两个数据。这里除了常量,还可以使用后面介绍的变量表达式在运行时动态解析对比。

  • relation

由于条件判断可以由多个条件组合合成,这些组合的条件关系可以使用如下两种:

a. or(或):表示只要有一个条件为真,则返回真,否则返回假

b. and(且):表示只有全部为真,则返回真,否则有一个假就返回假

在运行到条件判断节点,引擎会根据节点的执行结果(true或false),选择下一个要运行的节点B或C。

如下图,token进入条件判断节点出来后,只选择其中一个分支传递,也就是说另一个没有被传递token的节点就无法执行。

  • false_branch

节点输出结果为false时流转的分支节点

  • true_branch

节点输出结果为true时流转的分支节点

实现

实现上也是基于消息队列,只不过条件判断节点运行结束后,只会选取后置节点中的一个来执行,即推送到消息队列。

# 事件网关节点

原理

如下图,token在流入事件网关节点流出后,会复制跟后置节点同样个数的token,也就是说事件网关的后置节点会同时运行。

但是,不同的是,只要有任意一个后置节点运行结束,则另外其他几个后置节点就被强制终止运行,具有排他性,这是跟并行节点的区别所在。

image-20231025094803630

实现

实现上前面步骤跟的并行节点一样。后面还要加多一个实现,就是只要有一个后置节点执行结束,就立刻强制终止其他几个后置节点。

# 汇聚Join节点

原理

汇聚节点完整JSON结构如下:

{
    "instId": "xxx",
    "name": "汇聚",
    "type": "Gateway",
    "description": "汇聚",
    "template": "aggregation",
    "positions": [],
    "connections": [],
    "parameters": {
        "prev_instid_list": {
            "label": "prev_instid_list",
            "type": "array",
            "value": [],
            "default": [],
            "required": true
        },
        "relation": {
            "label": "relation",
            "type": "string",
            "value": "all/anyone/specific",
            "default": "all",
            "required": true
        }
    },
    "errorHandler": {},
    "loopHandler": {},
    "timeout": 0,
    "isIgnore": false,
    "runtimes": []
}

汇聚节点一般是要跟并行节点组合使用。只有满足汇聚节点条件后,才会继续往下执行。

汇聚可以设置的条件有以下几种情况(relation字段),假设前置节点有n个。

  • all:所有n个前置节点全部完成(相当于Petri网的AND Join)

  • any:任意一个前置节点运行完成(相当于Petri网的OR Join)

  • specific:指定m(1<m<n)个前置节点完成(prev_instid_list数组成员)(相当于Petri网的OR Join)

如下图,token从汇聚节点流出后,又汇聚变成一个。

image-20231025095155835

实现

  • Step1:每次汇聚节点的前置节点任务执行完,就将运行数据记录到下表
字段 说明
execution_id 工作流实例ID
aggregation_appinst_id 汇聚节点实例ID
previous_appinst_id 汇聚节点的潜前置节点实例ID
  • Step2:每次运行汇聚节点,引擎都从上表中获取前面已经执行过的前置节点实例。然后对比节点设置的完成条件是否满足。如果不满足,则不运行下一个节点,继续回到Step1。如果满足条件,则继续运行下一个节点。

注意:由于系统可能是在分布式环境,分布式汇聚节点任务的执行要通过分布式锁,保证每次执行当前工作流实例ID的并行节点实例只有一个,其余未得到分布式锁的任务可以轮询100ms来等待分布式锁释放。这样可以避免出现并发操作数据的情况。

备注:分布式锁的实现可以通过ETCD、Redis分布式锁等方式实现。

最后更新: 9/12/2024, 11:20:51 PM