介绍FSM原理及其实现方法。
# 有限状态机原理
状态机(State Machine)是一种用来进行计算或者解决问题的数学模型。它由一系列的状态和转换函数组成,这些转换函数定义了在给定输入和当前状态下如何改变状态。状态机可以用来模拟许多实际世界的系统,例如电梯的工作过程,交通灯的变化等。
而有限状态机(Finite State Machine,简称FSM)是状态机的一种特殊形式,它的状态数量是有限的。在任何给定的时间,有限状态机都处于其定义的某一状态中。当某个条件(通常是输入或时间事件)被满足时,它会从一个状态转移到另一个状态。
FSM包含如下几个核心概念:
- State:状态。可以对应工作流程中的一个阶段或步骤。
- Transition:流转,表示从一个状态流转到下一个状态,通常由事件触发。
- Action:动作,到达指定状态后执行的操作。
三者关系是:事件触发状态的转移,流转到指定状态后触发执行后续动作。
DAG侧重于描述任务之间的依赖关系,而FSM侧重于描述状态的流转关系。
相比于DAG不支持环的问题,FSM可以实现环,这个是FSM相比DAG方式实现工作流引擎的优势所在。
下面是一个简单的状态流转实例,其流转关系是:State A -> State B -> State C
Start: State A
State A:
Actions:
- 动作A
Next:
State B
State B:
Actions:
- 动作B1
- 动作B2
Next:
State C
State C:
Actions:
- 动作C
Next:
End
# 有限状态机实践
FSM的原理很简单,下面我们基于前面的原理来实现一个简单的FSM。
- 首先,我们定义了
State
类,它表示工作流中的一个状态。每个状态都有一个名字和一个动作列表。
class State:
def __init__(self, name, actions=None):
self.name = name
self.actions = actions if actions else []
- 接下来,我们定义了
Transition
类,它表示工作流中的一个状态转换。每个转换都有一个源状态、一个目标状态、一个触发转换的事件、一个条件列表和一个动作列表。
class Transition:
def __init__(self, source_state, target_state, event, conditions=None, actions=None):
self.source_state = source_state
self.target_state = target_state
self.event = event
self.conditions = conditions if conditions else []
self.actions = actions if actions else []
然后,我们定义了
FiniteStateMachine
类,它表示一个工作流实例。FiniteStateMachine
有一个状态列表、一个转换列表和一个当前状态。on_event
方法用于处理事件。当事件发生时,它会检查所有与当前状态相关的转换,如果事件匹配并满足所有条件,则执行当前状态和转换的动作,并将当前状态更改为目标状态。transition.conditions
是一个包含条件函数的列表,这些函数在调用时返回一个布尔值。当所有条件函数的返回值都为True
时,表示所有条件都满足。all()
函数接收一个可迭代对象(在本例中为生成器表达式),并返回True
当且仅当可迭代对象中的所有元素都为真值(True
)。生成器表达式condition() for condition in transition.conditions
会依次调用列表transition.conditions
中的每个条件函数,并返回一个包含所有条件函数返回值的生成器。如果转换的所有条件函数都返回
True
(即所有条件都满足),则执行该if
语句块中的代码。在这种情况下,将执行当前状态和转换的动作,并将当前状态更改为目标状态。
class FiniteStateMachine:
def __init__(self, states, transitions, initial_state):
self.states = states
self.transitions = transitions
self.current_state = initial_state
def on_event(self, event):
print(f"Current state: {self.current_state.name}, Event: {event}")
for transition in self.transitions:
if transition.source_state == self.current_state and transition.event == event:
if all(condition() for condition in transition.conditions):
for action in self.current_state.actions:
action()
for action in transition.actions:
action()
self.current_state = transition.target_state
print(f"New state: {self.current_state.name}")
break
- 接下来,我们定义了
WorkflowEngine
类,它用于管理工作流定义和实例。WorkflowEngine
有一个工作流定义字典和一个工作流实例字典。create_workflow_instance
方法根据工作流名称创建一个新的FiniteStateMachine
实例,并将其添加到实例字典中。on_event
方法用于将事件传递给指定的工作流实例。
class WorkflowEngine:
def __init__(self, workflow_definitions):
self.workflow_definitions = workflow_definitions
self.workflow_instances = {}
def create_workflow_instance(self, workflow_name):
state_machine = self._create_state_machine(workflow_name)
workflow_instance_id = len(self.workflow_instances)
self.workflow_instances[workflow_instance_id] = state_machine
return workflow_instance_id
def on_event(self, workflow_instance_id, event):
state_machine = self.workflow_instances[workflow_instance_id]
state_machine.on_event(event)
def _create_state_machine(self, workflow_name):
definition = self.workflow_definitions[workflow_name]
states = definition['states']
transitions = definition['transitions']
initial_state = states[0]
return FiniteStateMachine(states, transitions, initial_state)
在这个示例中,我们定义了一个名为
order_process
的工作流,其中包含三个状态(new_order
、paid
和shipped
)和两个转换(从new_order
到paid
和从paid
到shipped
)。如下图所示:![image-20240313234059811](./img/4 state-machine-demo.png)
最后,我们创建了一个
WorkflowEngine
实例,使用order_process
工作流定义创建了一个工作流实例,并触发了两个事件(payment_successful
和ship_order
),以使工作流实例从new_order
状态转换到paid
状态,然后转换到shipped
状态。
def is_valid():
# 检查某些有效性条件,例如检查输入值、数据库记录等
return True # 或 False,根据实际条件
# 定义工作流
new_order = State(name='new_order', actions=[])
paid = State(name='paid', actions=[])
shipped = State(name='shipped', actions=[])
workflow_definitions = {
'order_process': {
'states': [new_order, paid, shipped],
'transitions': [
Transition(
source_state=new_order,
target_state=paid,
event='payment_successful',
conditions=[is_valid], actions=[]),
Transition(
source_state=paid,
target_state=shipped,
event='ship_order',
conditions=[is_valid], actions=[]),
],
},
}
# 创建工作流引擎
workflow_engine = WorkflowEngine(workflow_definitions)
# 创建工作流实例
workflow_instance_id = workflow_engine.create_workflow_instance('order_process')
# 触发事件
workflow_engine.on_event(workflow_instance_id, 'payment_successful')
workflow_engine.on_event(workflow_instance_id, 'ship_order')
现在,当您运行代码并触发事件时,您应该看到类似以下的输出:
Current state: new_order, Event: payment_successful
New state: paid
Current state: paid, Event: ship_order
New state: shipped
这将显示工作流实例在事件触发时的状态变化。
总结
FSM很好解决了DAG中审批业务场景无法支持回环的问题,应用场景进一步得到扩展。流程引擎的实现方法介绍到FSM这里,基本上可以满足绝大多数的业务场景需求,其通用性也更加广泛。
但是我们进一步研究,会发现,还有一小部分的场景,FSM和DAG实现的流程引擎都无法支持。例如,我们通过表单来触发流程,这里如果设置了表单提交分数例如100份,要提交至少80份以后才开始触发后续的流程,例如问卷统计之类的步骤。但是我们知道DAG和FSM都基于事件触发状态或任务的流转,那么每次用户提交一份表单就会触发一次流程实例的创建和运行,这样导致没法等待提交一定数量后的问卷才触发流程实例创建执行,因为这两种流程引擎的实现原理都是立即触发运行的。
那么要解决这个问题,就需要用到后面我们介绍到的第三种算法:基于Petri网的令牌机制实现的流程引擎方法。
- 参考:https://web.mit.edu/6.111/www/f2017/handouts/L06.pdf