介绍FSM原理及其实现方法。

# 有限状态机原理

状态机(State Machine)是一种用来进行计算或者解决问题的数学模型。它由一系列的状态和转换函数组成,这些转换函数定义了在给定输入和当前状态下如何改变状态。状态机可以用来模拟许多实际世界的系统,例如电梯的工作过程,交通灯的变化等。

而有限状态机(Finite State Machine,简称FSM)是状态机的一种特殊形式,它的状态数量是有限的。在任何给定的时间,有限状态机都处于其定义的某一状态中。当某个条件(通常是输入或时间事件)被满足时,它会从一个状态转移到另一个状态。

FSM包含如下几个核心概念:

  • State:状态。可以对应工作流程中的一个阶段或步骤。
  • Transition:流转,表示从一个状态流转到下一个状态,通常由事件触发。
  • Action:动作,到达指定状态后执行的操作。

三者关系是:事件触发状态的转移,流转到指定状态后触发执行后续动作。

image-20231117093752924

DAG侧重于描述任务之间的依赖关系,而FSM侧重于描述状态的流转关系。

相比于DAG不支持环的问题,FSM可以实现环,这个是FSM相比DAG方式实现工作流引擎的优势所在。

下面是一个简单的状态流转实例,其流转关系是:State A -> State B -> State C

Start: State A
State A:
    Actions:
    - 动作A
    Next:
        State B
State B:
    Actions:
    - 动作B1
    - 动作B2
    Next:
        State C
State C:
    Actions:
    - 动作C
    Next:
        End

# 有限状态机实践

FSM的原理很简单,下面我们基于前面的原理来实现一个简单的FSM。

  1. 首先,我们定义了State类,它表示工作流中的一个状态。每个状态都有一个名字和一个动作列表。
class State:
    def __init__(self, name, actions=None):
        self.name = name
        self.actions = actions if actions else []
  1. 接下来,我们定义了Transition类,它表示工作流中的一个状态转换。每个转换都有一个源状态、一个目标状态、一个触发转换的事件、一个条件列表和一个动作列表。
class Transition:
    def __init__(self, source_state, target_state, event, conditions=None, actions=None):
        self.source_state = source_state
        self.target_state = target_state
        self.event = event
        self.conditions = conditions if conditions else []
        self.actions = actions if actions else []
  1. 然后,我们定义了FiniteStateMachine类,它表示一个工作流实例。FiniteStateMachine有一个状态列表、一个转换列表和一个当前状态。on_event方法用于处理事件。当事件发生时,它会检查所有与当前状态相关的转换,如果事件匹配并满足所有条件,则执行当前状态和转换的动作,并将当前状态更改为目标状态。

    transition.conditions是一个包含条件函数的列表,这些函数在调用时返回一个布尔值。当所有条件函数的返回值都为True时,表示所有条件都满足。

    all()函数接收一个可迭代对象(在本例中为生成器表达式),并返回True当且仅当可迭代对象中的所有元素都为真值(True)。生成器表达式condition() for condition in transition.conditions会依次调用列表transition.conditions中的每个条件函数,并返回一个包含所有条件函数返回值的生成器。

    如果转换的所有条件函数都返回True(即所有条件都满足),则执行该if语句块中的代码。在这种情况下,将执行当前状态和转换的动作,并将当前状态更改为目标状态。

class FiniteStateMachine:
    def __init__(self, states, transitions, initial_state):
        self.states = states
        self.transitions = transitions
        self.current_state = initial_state

    def on_event(self, event):
        print(f"Current state: {self.current_state.name}, Event: {event}")
        for transition in self.transitions:
            if transition.source_state == self.current_state and transition.event == event:
                if all(condition() for condition in transition.conditions):
                    for action in self.current_state.actions:
                        action()
                    for action in transition.actions:
                        action()
                    self.current_state = transition.target_state
                    print(f"New state: {self.current_state.name}")
                    break

  1. 接下来,我们定义了WorkflowEngine类,它用于管理工作流定义和实例。WorkflowEngine有一个工作流定义字典和一个工作流实例字典。create_workflow_instance方法根据工作流名称创建一个新的FiniteStateMachine实例,并将其添加到实例字典中。on_event方法用于将事件传递给指定的工作流实例。
class WorkflowEngine:
    def __init__(self, workflow_definitions):
        self.workflow_definitions = workflow_definitions
        self.workflow_instances = {}

    def create_workflow_instance(self, workflow_name):
        state_machine = self._create_state_machine(workflow_name)
        workflow_instance_id = len(self.workflow_instances)
        self.workflow_instances[workflow_instance_id] = state_machine
        return workflow_instance_id

    def on_event(self, workflow_instance_id, event):
        state_machine = self.workflow_instances[workflow_instance_id]
        state_machine.on_event(event)

    def _create_state_machine(self, workflow_name):
        definition = self.workflow_definitions[workflow_name]
        states = definition['states']
        transitions = definition['transitions']
        initial_state = states[0]
        return FiniteStateMachine(states, transitions, initial_state)
  1. 在这个示例中,我们定义了一个名为order_process的工作流,其中包含三个状态(new_orderpaidshipped)和两个转换(从new_orderpaid和从paidshipped)。如下图所示:

    ![image-20240313234059811](./img/4 state-machine-demo.png)

  2. 最后,我们创建了一个WorkflowEngine实例,使用order_process工作流定义创建了一个工作流实例,并触发了两个事件(payment_successfulship_order),以使工作流实例从new_order状态转换到paid状态,然后转换到shipped状态。

def is_valid():
    # 检查某些有效性条件,例如检查输入值、数据库记录等
    return True  # 或 False,根据实际条件
  
# 定义工作流
new_order = State(name='new_order', actions=[])
paid = State(name='paid', actions=[])
shipped = State(name='shipped', actions=[])

workflow_definitions = {
    'order_process': {
        'states': [new_order, paid, shipped],
        'transitions': [
            Transition(
              source_state=new_order, 
              target_state=paid, 
              event='payment_successful', 
              conditions=[is_valid], actions=[]),
            Transition(
              source_state=paid, 
              target_state=shipped, 
              event='ship_order', 
              conditions=[is_valid], actions=[]),
        ],
    },
}

# 创建工作流引擎
workflow_engine = WorkflowEngine(workflow_definitions)

# 创建工作流实例
workflow_instance_id = workflow_engine.create_workflow_instance('order_process')

# 触发事件
workflow_engine.on_event(workflow_instance_id, 'payment_successful')
workflow_engine.on_event(workflow_instance_id, 'ship_order')

现在,当您运行代码并触发事件时,您应该看到类似以下的输出:

Current state: new_order, Event: payment_successful
New state: paid
Current state: paid, Event: ship_order
New state: shipped

这将显示工作流实例在事件触发时的状态变化。

总结

FSM很好解决了DAG中审批业务场景无法支持回环的问题,应用场景进一步得到扩展。流程引擎的实现方法介绍到FSM这里,基本上可以满足绝大多数的业务场景需求,其通用性也更加广泛。

但是我们进一步研究,会发现,还有一小部分的场景,FSM和DAG实现的流程引擎都无法支持。例如,我们通过表单来触发流程,这里如果设置了表单提交分数例如100份,要提交至少80份以后才开始触发后续的流程,例如问卷统计之类的步骤。但是我们知道DAG和FSM都基于事件触发状态或任务的流转,那么每次用户提交一份表单就会触发一次流程实例的创建和运行,这样导致没法等待提交一定数量后的问卷才触发流程实例创建执行,因为这两种流程引擎的实现原理都是立即触发运行的。

那么要解决这个问题,就需要用到后面我们介绍到的第三种算法:基于Petri网的令牌机制实现的流程引擎方法。

  • 参考:https://web.mit.edu/6.111/www/f2017/handouts/L06.pdf
最后更新: 9/12/2024, 11:20:51 PM