前面我们介绍了Petri网的原理,这篇文章我们介绍基于Petri网原理实现的开源项目来加深对Petri网的理解。目前网上基于YAWL的开源和商业化产品非常少,YAWL是其中一个开源的项目,虽然时间比较久远,但是他是比较严格遵循Petri网原理实现的项目,有很多思想和原理是可以学习和参考的。
# YAWL核心概念
# 基本元素
YAWL(Yet Another Workflow Language)是一种基于Petri网的工作流引擎,它提供了一种用于描述和执行业务流程的形式化方法。YAWL工作流引擎是一个开源项目,由澳大利亚昆士兰科技大学和荷兰埃因霍温科技大学的研究人员共同开发。它的目标是为组织提供一个灵活、可扩展和易于集成的工作流解决方案。
YAWL工作流引擎的核心特点包括:
- 强大的建模能力:YAWL工作流引擎提供了丰富的建模元素,如任务、条件、资源、数据和事件,以支持各种业务场景的建模需求。同时,它还支持复杂的控制流和数据流模式,使得用户能够轻松地描述多样化的业务过程。
- 灵活的资源管理:YAWL工作流引擎提供了一种基于角色和规则的资源管理模式,使得用户可以灵活地分配任务给不同的参与者。此外,它还支持资源过滤、优先级分配和任务委派等高级功能,以满足不同组织的需求。
- 可扩展的架构:YAWL工作流引擎采用了模块化的设计,用户可以根据需要选择和扩展不同的功能模块。同时,它还提供了一套完整的API,方便用户与其他系统进行集成。
- 易于使用的工具支持:YAWL工作流引擎提供了一套图形化的建模和执行工具,使得用户可以直观地设计、验证和执行业务流程。此外,它还提供了丰富的文档和示例,帮助用户快速上手。
- 开源和社区支持:YAWL工作流引擎是一个开源项目,用户可以免费使用和修改其源代码。同时,它还拥有一个活跃的社区,用户可以在社区中获取帮助和分享经验。
下面是YAWL流程定义中支持的几种Condition类和Task类基本元素。YAWL中的Condition类元素对应Petri网的库所Place,Task类元素对应Petri网的变迁Transition。
- Condition:条件是YAWL流程中的一个基本元素,代表了流程中的一个状态。条件可以是开始状态、结束状态或者中间的任何一个状态。
- InputCondition:输入条件是流程的开始状态,只有满足了输入条件,流程才能开始。
- OutputCondition:输出条件是流程的结束状态,只有满足了输出条件,流程才能结束。
- AtomicTask:原子任务是流程中的一个基本执行单元,它不能被进一步分解。
- CompositeTask:复合任务是由两个或者更多的原子任务组成的,它可以被分解为更小的任务进行执行。
- Multiple Instance of an Atomic Task:原子任务的多实例表示一个原子任务可以有多个实例同时执行。
- Multiple Instance of a Composite Task:复合任务的多实例表示一个复合任务可以有多个实例同时执行。
- AND-split task:AND-split任务表示一个任务可以同时分裂为多个并行的任务。
- AND-join task:AND-join任务表示多个并行的任务必须都完成,才能继续执行下一个任务。
- OR-split task:OR-split任务表示一个任务可以分裂为多个并行的任务,但只需要其中的任何一个任务完成,就可以继续执行下一个任务。
- OR-join task:OR-join任务表示多个并行的任务只需要其中的任何一个任务完成,就可以继续执行下一个任务。
- XOR-split task:XOR-split任务表示一个任务可以分裂为多个并行的任务,但只能选择其中的一个任务进行执行。
- XOR-join task:XOR-join任务表示多个并行的任务只能选择其中的一个任务完成,才能继续执行下一个任务。
- Cancellation Region:取消区域是指在某个任务被执行时,可以取消该任务所在区域内的所有其他任务。这是YAWL中处理异常和撤销操作的一种机制。
# 软件组成
安装完YAWL启动成功后,首先会看到下面的控制面板:
点击第四个图标:进入流程编辑器。
在这个编辑器可以通过可视化的方式设计YAWL的流程定义,包括任务顺序、变量引用、资源分配等。
点击第三个图标:进入管理系统(默认账号:admin,密码:YAWL)。
导入上一步设计的流程定义XML文件后,就可以在这里进行任务的分发以及任务的处理。
# XML流程定义
我们直接通过一个官方具体的流程例子来了解YAWL的XML流程定义。下面这个描述了一个名为"Document Handling"的业务流程。
这个业务流程包含两个任务:Upload和Download。Upload任务的完成将触发Download任务的开始。每个任务都有自己的输入和输出参数,以及任务的执行条件和分支选择。
以下是一些关键元素的解释:
<metaData>
:包含了业务流程的元数据,如标题、创建者、描述、版本等。例如,<title>
定义了流程的标题为"Document Handling"<creator>
定义了流程的创建者为"Andreas V. Hense"<description>
定义了流程的描述信息<version>
定义了流程的版本为0.6。<metaData> <title>Document Handling</title> <creator>Andreas V. Hense</creator> <creator>September 2014</creator> <description>A simple workflow that demonstrates the document handling feature of YAWL.</description> <version>0.6</version> <persistent>false</persistent> <identifier>UID_af746526-a31f-490d-a43b-8bfa8c522a5d</identifier> </metaData>
<decomposition>
:定义了业务流程的主要组成部分,即任务。每个Decomposition元素都包含了一系列的任务定义。在这个例子中,有一个名为"Net"的分解,它是根网络,包含了所有的任务和流程控制元素。"Net"分解中定义了一个名为"document"的本地变量,类型为"YDocumentType"。
<decomposition id="Net" isRootNet="true" xsi:type="NetFactsType"> ... <localVariable> <index>0</index> <name>document</name> <type>YDocumentType</type> <namespace>http://www.w3.org/2001/XMLSchema</namespace> <initialValue /> </localVariable> ... </decomposition>
<task>
:定义了一个具体的任务,包括任务的名称、描述、输入和输出参数、执行条件等。在这个例子中,有两个任务:Upload和Download。每个任务都有自己的名称,例如"Upload"任务的名称就是"Upload"。每个任务都有自己的执行条件和分支选择,例如"Upload"任务的连接条件和分支选择分别是"xor"和"and"。
<task id="Upload"> <name>Upload</name> ... <join code="xor" /> <split code="and" /> ... <decomposesTo id="Upload" /> </task>
<flowsInto>
:定义了任务的执行顺序和分支选择。例如,"InputCondition"任务执行完后将流转到"Upload"任务,"Upload"任务执行完后将流转到"Download"任务。
<inputCondition id="InputCondition"> <flowsInto> <nextElementRef id="Upload" /> </flowsInto> </inputCondition>
<mapping>
:定义了任务的输入和输出数据的映射。例如,"Upload"任务完成后,它的输出数据将映射到"document"变量。
<completedMappings> <mapping> <expression query="<document>{/Upload/document/*}</document>" /> <mapsTo>document</mapsTo> </mapping> </completedMappings>
<resourcing>
:定义了任务的资源分配策略。例如,"Upload"任务的资源分配策略是由用户触发,而"Download"任务的资源分配策略是由系统触发。
<resourcing> <offer initiator="user" /> <allocate initiator="user" /> <start initiator="user" /> </resourcing>
<decomposesTo>
:指定了任务的具体实现。例如,"Upload"任务的具体实现是"Upload"分解。
<decomposesTo id="Upload" /> .... <decomposition id="Upload" xsi:type="WebServiceGatewayFactsType"> <outputParam> <index>0</index> <name>document</name> <type>YDocumentType</type> <namespace>http://www.w3.org/2001/XMLSchema</namespace> <defaultValue /> </outputParam> <externalInteraction>manual</externalInteraction> </decomposition>
<layout>
:定义了业务流程的布局信息,包括每个任务和流程控制元素的位置和大小。例如,"InputCondition"的位置是(x=36, y=44),大小是(w=32, h=32)。
<layout> ... <specification id="document"> ... <net id="Net"> ... <vertex id="InputCondition"> <attributes> <bounds x="36" y="44" w="32" h="32"/> </attributes> </vertex> ... </net> </specification> </layout>
document.xml
下面是完整的定义文件:
<?xml version="1.0" encoding="UTF-8"?>
<specificationSet xmlns="http://www.yawlfoundation.org/yawlschema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" version="3.0" xsi:schemaLocation="http://www.yawlfoundation.org/yawlschema http://www.yawlfoundation.org/yawlschema/YAWL_Schema3.0.xsd">
<specification uri="document">
<documentation>No description provided</documentation>
<metaData>
<title>Document Handling</title>
<creator>Andreas V. Hense</creator>
<creator>September 2014</creator>
<description>A simple workflow that demonstrates the document handling feature of YAWL.</description>
<version>0.6</version>
<persistent>false</persistent>
<identifier>UID_af746526-a31f-490d-a43b-8bfa8c522a5d</identifier>
</metaData>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" />
<decomposition id="Net" isRootNet="true" xsi:type="NetFactsType">
<localVariable>
<index>0</index>
<name>document</name>
<type>YDocumentType</type>
<namespace>http://www.w3.org/2001/XMLSchema</namespace>
<initialValue />
</localVariable>
<processControlElements>
<inputCondition id="InputCondition">
<flowsInto>
<nextElementRef id="Upload" />
</flowsInto>
</inputCondition>
<task id="Upload">
<name>Upload</name>
<flowsInto>
<nextElementRef id="Download" />
</flowsInto>
<join code="xor" />
<split code="and" />
<completedMappings>
<mapping>
<expression query="<document>{/Upload/document/*}</document>" />
<mapsTo>document</mapsTo>
</mapping>
</completedMappings>
<resourcing>
<offer initiator="user" />
<allocate initiator="user" />
<start initiator="user" />
</resourcing>
<decomposesTo id="Upload" />
</task>
<task id="Download">
<name>Download</name>
<flowsInto>
<nextElementRef id="OutputCondition" />
</flowsInto>
<join code="xor" />
<split code="and" />
<startingMappings>
<mapping>
<expression query="<document>{/Net/document/*}</document>" />
<mapsTo>document</mapsTo>
</mapping>
</startingMappings>
<resourcing>
<offer initiator="system">
<distributionSet>
<initialSet />
</distributionSet>
<familiarParticipant taskID="Upload" />
</offer>
<allocate initiator="system">
<allocator>
<name>RandomChoice</name>
</allocator>
</allocate>
<start initiator="system" />
</resourcing>
<decomposesTo id="Download" />
</task>
<outputCondition id="OutputCondition" />
</processControlElements>
</decomposition>
<decomposition id="Download" xsi:type="WebServiceGatewayFactsType">
<inputParam>
<index>0</index>
<name>document</name>
<type>YDocumentType</type>
<namespace>http://www.w3.org/2001/XMLSchema</namespace>
</inputParam>
<externalInteraction>manual</externalInteraction>
</decomposition>
<decomposition id="Upload" xsi:type="WebServiceGatewayFactsType">
<outputParam>
<index>0</index>
<name>document</name>
<type>YDocumentType</type>
<namespace>http://www.w3.org/2001/XMLSchema</namespace>
<defaultValue />
</outputParam>
<externalInteraction>manual</externalInteraction>
</decomposition>
</specification>
<layout>
<locale language="en" country="US"/>
<specification id="document">
<size w="62" h="26"/>
<net id="Net">
<bounds x="0" y="0" w="691" h="933"/>
<frame x="0" y="0" w="694" h="936"/>
<viewport x="0" y="0" w="694" h="936"/>
<vertex id="OutputCondition">
<attributes>
<bounds x="424" y="44" w="32" h="32"/>
</attributes>
</vertex>
<vertex id="InputCondition">
<attributes>
<bounds x="36" y="44" w="32" h="32"/>
</attributes>
</vertex>
<container id="Download">
<vertex>
<attributes>
<bounds x="284" y="44" w="32" h="32"/>
</attributes>
</vertex>
<label>
<attributes>
<bounds x="252" y="76" w="96" h="18"/>
</attributes>
</label>
</container>
<container id="Upload">
<vertex>
<attributes>
<bounds x="140" y="44" w="32" h="32"/>
</attributes>
</vertex>
<label>
<attributes>
<bounds x="108" y="76" w="96" h="18"/>
</attributes>
</label>
</container>
<flow source="InputCondition" target="Upload">
<ports in="13" out="12"/>
<attributes>
<lineStyle>11</lineStyle>
<points>
<value x="67" y="55.5"/>
<value x="212" y="55.5"/>
</points>
</attributes>
</flow>
<flow source="Upload" target="Download">
<ports in="13" out="12"/>
<attributes>
<lineStyle>11</lineStyle>
<points>
<value x="243" y="55.5"/>
<value x="404" y="55.5"/>
</points>
</attributes>
</flow>
<flow source="Download" target="OutputCondition">
<ports in="13" out="12"/>
<attributes>
<lineStyle>11</lineStyle>
<points>
<value x="435" y="55.5"/>
<value x="620" y="55.5"/>
</points>
</attributes>
</flow>
</net>
</specification>
</layout>
</specificationSet>
# 流程测试
下面我们将前面的流程定义文件导入到YAWL管理系统:
创建一个参与者,后续任务会分配给这个参与者:
启动流程:
分发任务,任务分发后有原来的Unoffered状态变为Offered状态:
切换到前面创建的test账号处理分发的任务:
接收并启动任务,任务状态从Offered转换到Started,
完成任务后,在我的工作列表页面就会清空:
Upload任务完成后就会触发下一个任务实例创建启动,同样的按照前面的操作完成即可。
至此,我们完整定义测试了YAWL的流程。下面我们具体展开其PetriNet算法核心的方法实现。
# YAWL Petri网核心方法剖析
YAWL核心的调度算法在YnetRunner类的continueIfPossible方法里,其中调用的t_enabled方法用于判断任务是否可以被激活,是整个核心的判断算法,该方法正是基于PetriNet的令牌机制实现的。
注意:这里YAWL 所支持的两种类型节点:Condtion 和 Task。其中Condition对应PetriNet 中所 place 库所,Task对应PetriNet 中的 Transition变迁,在结合PetriNet算法分析源码时需要注意。
# YEngine.java类的startCase方法
下面是我们去启用一个流程时调用的一个方法。
该方法的主要功能是启动一个新的工作流实例。以下是方法的主要步骤:
- 确保要启动的工作流规范已加载并为最新版本,否则抛出 YStateException 异常。
- 检查并格式化工作流实例的输入参数。
- 创建一个 YNetRunner 对象,用于执行工作流实例。
- 将 YNetRunner 对象添加到 _netRunnerRepository 中。
- 记录工作流实例已启动的日志。
- 如果不是在恢复过程中,并且有持久化管理器,将 YNetRunner 对象持久化。
- 使用持久化管理器继续执行工作流实例,调用
runner.continueIfPossible(_pmgr)
方法。 - 使用持久化管理器启动工作流实例,调用
runner.start(_pmgr)
方法。 - 获取工作流实例的标识符。
- 如果工作流实例中有活动任务,将其添加到 _runningCaseIDToSpecMap 中。
- 如果存在 _interfaceBClient,通知独立 GUI 添加新的工作流实例。
- 返回工作流实例的标识符。
路径:yawl/src/org/yawlfoundation/yawl/engine /YEngine.java
protected YIdentifier startCase(YSpecificationID specID, String caseParams,
URI completionObserver, String caseID,
YLogDataItemList logData, String serviceRef, boolean delayed)
throws YStateException, YDataStateException, YQueryException, YPersistenceException {
// check spec is loaded and is latest version (YStateException if not)
YSpecification specification = _specifications.getSpecificationForCaseStart(specID);
// check & format case data params (if any)
Element data = formatCaseParams(caseParams, specification);
YNetRunner runner = new YNetRunner(_pmgr, specification.getRootNet(), data, caseID);
_netRunnerRepository.add(runner);
logCaseStarted(specID, runner, completionObserver, caseParams, logData,
serviceRef, delayed);
// persist it
if ((! _restoring) && (_pmgr != null)) {
_pmgr.storeObject(runner);
}
runner.continueIfPossible(_pmgr);
runner.start(_pmgr);
YIdentifier runnerCaseID = runner.getCaseID();
// special case: if spec contains exactly one task, and its empty,
// the case (and runner) has already completed, so don't update map
if (runner.hasActiveTasks()) {
_runningCaseIDToSpecMap.put(runnerCaseID, specification);
// announce the new case to the standalone gui (if any)
if (_interfaceBClient != null) {
_logger.debug("Asking client to add case {}", runnerCaseID.toString());
_interfaceBClient.addCase(specID, runnerCaseID.toString());
}
}
return runnerCaseID;
}
# YNetRunner类的continueIfPossible
continueIfPossible
方法主要用来控制工作流的执行流程。下面是函数具体的执行步骤:
首先,检查是否处于暂停状态。如果是,则记录一条调试信息,表示由于案例当前处于暂停/挂起状态而中止运行器继续,并返回true。
如果网络已经完成,则不再继续,返回false。
创建一个YEnabledTransitionSet类型的变量enabledTransitions,用于存储启用任务的运行集。
遍历_netTasks中的所有任务(YTask类型)。
a. 如果任务是一个启用的"transition",并且_enabledTasks和_busyTasks都不包含该任务,则将其添加到enabledTransitions集合中。
b. 如果任务不是(或不再是)一个启用的transition,且之前已被引擎启用,则必须撤销该任务。调用withdrawEnabledTask方法,传入任务和pmgr作为参数。
c. 如果任务处于忙碌状态,且_busyTasks不包含该任务,则记录一个错误信息,表示列表不同步,并抛出一个运行时异常,指出忙碌任务列表与忙碌任务不同步。
如果enabledTransitions集合不为空,则调用fireTasks方法,传入enabledTransitions和pmgr作为参数,以触发一组启用的“transition”。
更新_busyTasks为_net中的忙碌任务。
返回是否有活动任务(调用hasActiveTasks方法)。
下图是引用continueIfPossible
方法调用和被调用的链路关系。
路径:https://github.com/yawlfoundation/yawl/blob/b8196a160f5f950f9241e09e5d1abc6c058756e8/src/org/yawlfoundation/yawl/engine/YNetRunner.java#L537
public synchronized boolean continueIfPossible(YPersistenceManager pmgr)
throws YDataStateException, YStateException, YQueryException,
YPersistenceException {
_logger.debug("--> continueIfPossible");
// Check if we are suspending (or suspended?) and if so exit out as we
// shouldn't post new workitems
if (isInSuspense()) {
_logger.debug("Aborting runner continuation as case is currently suspending/suspended");
return true;
}
// don't continue if the net has already finished
if (isCompleted()) return false;
// storage for the running set of enabled tasks
YEnabledTransitionSet enabledTransitions = new YEnabledTransitionSet();
// iterate through the full set of tasks for the net
for (YTask task : _netTasks) {
// if this task is an enabled 'transition'
if (task.t_enabled(_caseIDForNet)) {
if (! (_enabledTasks.contains(task) || _busyTasks.contains(task)))
enabledTransitions.add(task) ;
}
else {
// if the task is not (or no longer) an enabled transition, and it
// has been previously enabled by the engine, then it must be withdrawn
if (_enabledTasks.contains(task)) {
withdrawEnabledTask(task, pmgr);
}
}
if (task.t_isBusy() && !_busyTasks.contains(task)) {
_logger.error("Throwing RTE for lists out of sync");
throw new RuntimeException("Busy task list out of synch with a busy task: "
+ task.getID() + " busy tasks: " + _busyTasks);
}
}
// fire the set of enabled 'transitions' (if any)
if (! enabledTransitions.isEmpty()) fireTasks(enabledTransitions, pmgr);
_busyTasks = _net.getBusyTasks();
_logger.debug("<-- continueIfPossible");
return hasActiveTasks();
}
在循环判断里面,该方法调用t_enabled
函数来判断每一个任务是否可被激活,这是一个很关键的算法,该算法的实现正是基于Petri网的令牌机制实现的。
# YTask类的t_enabled方法
t_enabled
根据任务的_joinType(连接类型),执行以下操作:
如果_joinType为AND-JOIN方法(与连接):
遍历任务的预设元素,如果其中的任何一个条件不包含标识符(即令牌token),则返回false。
如果所有条件都包含标识符(即令牌token),则返回true。
如果_joinType为OR-JOIN方式(或连接):
- 调用_net的orJoinEnabled方法,传入当前任务和id作为参数,并返回结果。
如果_joinType为XOR-方式(异或连接):
遍历任务的预设元素(YExternalNetElement类型),如果其中的任何一个条件包含标识符(即令牌token),则返回true。
如果没有条件包含标识符,则返回false。
如果_joinType为其他类型,则返回false。
下图是该方法调用和被调用的链路关系。
路径:https://github.com/yawlfoundation/yawl/blob/master/src/org/yawlfoundation/yawl/stateless/elements/YTask.java#L917
public synchronized boolean t_enabled(YIdentifier id) {
if (_i != null) return false; // busy tasks are never enabled
switch (_joinType) {
case YTask._AND:
for (YExternalNetElement condition : getPresetElements()) {
if (!((YCondition) condition).containsIdentifier()) {
return false;
}
}
return true;
case YTask._OR:
return _net.orJoinEnabled(this, id);
case YTask._XOR:
for (YExternalNetElement condition : getPresetElements()) {
if (((YCondition) condition).containsIdentifier()) {
return true;
}
}
return false;
default:
return false;
}
}
上面的代码对应yawl中AND-JOIN、OR-JOIN和XOR-JOIN三种不同的连接类型,它们用于控制工作流的流程。具体来说:
- AND-JOIN:这是一种同步连接,表示一个任务必须等待所有前置任务都完成后才能开始。在Petri网中,这对应于一个具有多个输入弧的变迁,其中每个输入弧都必须有一个令牌才能触发变迁。
- OR-JOIN:这是一种非同步连接,表示一个任务可以在一个或多个前置任务完成后开始。在Petri网中,这对应于一个具有多个输入弧的变迁,其中任何一个输入弧有令牌就可以触发变迁。
- XOR-JOIN:这是一种选择连接,表示一个任务在多个前置任务中只能由一个完成后才能开始,有排他性。在Petri网中,这对应于一个具有多个输入弧的变迁,其中只能有一个输入弧有令牌才能触发变迁。
这三种连接类型提供了对工作流流程的精细控制,使得YAWL能够表示各种复杂的业务逻辑。
YNet类的orJoinEnabled方法
我们继续看OR-JOIN链接中调用的_net.orJoinEnabled方法。该方法返回一个布尔值,表示是否启用了或连接。
首先,检查orJoinTask和caseID是否为null。如果它们中的任何一个为null,则抛出一个运行时异常,表示在调用具有空参数的情况下检查或连接的启用性是无关紧要的。
接下来,检查orJoinTask的连接类型是否为YTask._OR(或连接)。如果不是,则抛出一个运行时异常,表示orJoinTask不是一个或连接。
创建一个名为actualMarking的YMarking对象,传入caseID作为参数。然后创建一个名为locations的YNetElement类型的列表,用于存储actualMarking中的位置。
获取orJoinTask的预设元素集合preSet。如果locations包含preSet的所有元素,则返回true。
遍历locations中的每个YNetElement类型的元素。如果preSet包含该元素,则执行以下操作:
a. 创建一个名为e2Net的E2WFOJNet对象,传入当前对象(this)和orJoinTask作为参数。
b. 调用e2Net的restrictNet方法,分别传入actualMarking和orJoinTask作为参数。
c. 调用e2Net的orJoinEnabled方法,传入actualMarking和orJoinTask作为参数,并返回结果。
如果在执行这些操作时发生异常,抛出一个运行时异常,表示在或连接调用中出现异常。
如果执行到这一步,说明或连接任务在预设中没有令牌,返回false。
路径:https://github.com/yawlfoundation/yawl/blob/master/src/org/yawlfoundation/yawl/elements/YNet.java#L346
public boolean orJoinEnabled(YTask orJoinTask, YIdentifier caseID) {
if (orJoinTask == null || caseID == null) {
throw new RuntimeException("Irrelevant to check the enabledness of an " +
"or join if this is called with null params.");
}
if (orJoinTask.getJoinType() != YTask._OR) {
throw new RuntimeException(orJoinTask + " is not an OR-Join.");
}
YMarking actualMarking = new YMarking(caseID);
List<YNetElement> locations = new Vector<YNetElement>(actualMarking.getLocations());
Set preSet = orJoinTask.getPresetElements();
if (locations.containsAll(preSet)) {
return true;
}
for (YNetElement element : locations) {
if (preSet.contains(element)) {
try {
E2WFOJNet e2Net = new E2WFOJNet(this, orJoinTask);
e2Net.restrictNet(actualMarking);
e2Net.restrictNet(orJoinTask);
return e2Net.orJoinEnabled(actualMarking, orJoinTask);
} catch (Exception e) {
throw new RuntimeException("Exception in OR-join call:" + e);
}
}
}
// or join task has no tokens in preset
return false;
}
这里你可能会好奇OR-JOIN不是只要有一个前置任务完成就满足启用条件了吗?为什么下面要判断是否所有前置任务的条件都满足。
if (locations.containsAll(preSet)) {
return true;
}
这是因为,在这个特定的上下文中,locations.containsAll(preSet)
返回true
表示所有的前置任务(即预设元素)都已经完成。这意味着OR-JOIN任务可以被启用,因为OR-JOIN的定义是只要有一个或多个前置任务完成,就可以启用,这里很好理解。
然而,如果并非所有的前置任务都完成,就需要进行更复杂的检查来确定是否可以启用OR-JOIN任务。这是因为在某些情况下,即使并非所有的前置任务都完成,OR-JOIN任务也可能可以启用。例如,如果前置任务之间存在某种依赖关系,那么完成了一部分前置任务可能就足够启用OR-JOIN任务了。
这个检查的目的是为了优化性能:如果所有的前置任务都已经完成,那么就可以立即确定OR-JOIN任务可以启用,无需进行更复杂的检查(调用E2WFOJNet类的orJoinEnabled方法)。只有在需要时,才进行更复杂的检查。
下面我们看E2WFOJNet的orJoinEnabled方法实现原理,就知道其复杂度了。
E2WFOJNet类的orJoinEnabled方法
路径:https://github.com/yawlfoundation/yawl/blob/master/src/org/yawlfoundation/yawl/elements/e2wfoj/E2WFOJNet.java#L728
该方法返回一个布尔值,表示在给定的标记(即当前状态)下是否应启用具有或连接的任务。
下图是该方法调用和被调用的链路关系。
- 首先,创建两个新的集合:MarkedTasks和RMap。MarkedTasks用于存储标记的任务,RMap用于存储转换后的标记。
- 接下来,遍历M中的所有位置。对于每个位置,如果它是一个条件(YCondition类型),则获取对应的地方(RPlace类型),并在RMap中添加或更新该地方的令牌计数。如果位置是一个任务(YTask类型),则将其添加到MarkedTasks集合中。
- 然后,遍历MarkedTasks中的每个任务,将其转换为相应的地方,并在RMap中添加该地方的令牌计数。
- 创建一个新的RMarking对象RM,表示转换后的标记,并清空RMap。
- 生成或连接的biggerEnabling标记。对于或连接的每个预设元素,如果它在M中被标记,则在newMap中添加一个令牌。否则,将其添加到emptyPreSetPlaces集合中。
- 对于emptyPreSetPlaces中的每个地方,添加一个令牌,并将结果标记添加到Y集合中。
- 最后,遍历Y中的每个标记。如果RM可以覆盖当前标记,则返回false。如果没有标记可以被覆盖,则返回true。
这个方法的主要思想是将YAWL模型转换为ResetNet模型,然后使用Coverable函数来检查在给定标记下是否应启用OR-JOIN或连接。这是因为在ResetNet模型中,OR-JOIN或连接的启用规则更容易处理。
/**
* This method is used to determine whether
* an OrJoin task of a YAWL net should be enabled at
* a given marking. The method returns TRUE if an OrJoin
* should be enabled at the given marking and FALSE, otherwise.
*/
public boolean orJoinEnabled(YMarking M,YTask orJoin){
Set MarkedTasks = new HashSet();
Map RMap = new HashMap();
//Need to convert from YAWL to ResetNet
List YLocations = new Vector(M.getLocations());
for (Iterator i = YLocations.iterator(); i.hasNext();)
{ YNetElement nextElement = (YNetElement) i.next();
if (nextElement instanceof YCondition)
{ YCondition condition = (YCondition) nextElement;
RPlace place = (RPlace) _Places.get(condition.getID());
if (place != null){
String placename = place.getID();
Integer tokenCount = new Integer(1);
if (RMap.containsKey(placename))
{ Object value = RMap.get(placename);
Integer countString = new Integer(value.toString());
int count = countString.intValue();
count ++;
tokenCount = new Integer(count);
}
RMap.put(placename,tokenCount);
}
}
if (nextElement instanceof YTask)
{ MarkedTasks.add(nextElement);
}
} //endfor
//To convert the active tasks in the marking into appropriate places
for (Iterator placeConvIter = MarkedTasks.iterator(); placeConvIter.hasNext();)
{ YTask task = (YTask) placeConvIter.next();
String internalPlace = "p_"+ task.getID();
RPlace place = (RPlace) _Places.get(internalPlace);
if (place != null) {
String placename = place.getID();
Integer tokenCount = new Integer(1);
RMap.put(placename,tokenCount);
}
}
// Equivalent Reset net marking
RMarking RM = new RMarking(RMap);
RMap = null;
// Generate biggerEnabling markings for OJ
Set X = orJoin.getPresetElements();
Map newMap = new HashMap();
Set emptyPreSetPlaces = new HashSet();
Integer tokenCount = new Integer(1);
for (Iterator x = X.iterator(); x.hasNext();)
{ YCondition preSetCondition = (YCondition) x.next();
RPlace preSetPlace = (RPlace) _Places.get(preSetCondition.getID());
if (preSetPlace != null)
{ if (YLocations.contains(preSetCondition))
{
//Add one token for each marked place
String preSetPlaceName = preSetPlace.getID();
newMap.put(preSetPlaceName,tokenCount);
}
else
{ emptyPreSetPlaces.add(preSetPlace);
}
}
} //end for
RSetOfMarkings Y = new RSetOfMarkings();
for (Iterator i = emptyPreSetPlaces.iterator(); i.hasNext();)
{ RPlace q = (RPlace) i.next();
// Add one token for exactly one empty place
tokenCount = new Integer(1);
String qname = q.getID();
newMap.put(qname,tokenCount);
RMarking M_w = new RMarking(new HashMap(newMap));
Y.addMarking(M_w);
newMap.remove(qname);
}
for (Iterator i = Y.getMarkings().iterator(); i.hasNext();)
{ RMarking M_w = (RMarking) i.next();
if (Coverable(RM,M_w))
{
return false;
}
}
return true;
}
# YNetRunner类的fireTasks方法
我们接着看continueIfPossible
方法里调用的fireTasks
方法。下图是该方法被调用和调用的链路关系。
这个方法的核心思想是根据任务组的类型(复合任务、空任务或原子任务)来触发相应的任务。对于复合任务和空任务,只触发任务组中的一个任务;对于原子任务,触发任务组中的所有任务。这样可以确保在遇到不同类型的任务时,工作流引擎能够采取适当的行动。
首先,创建一个名为enabledTasks的HashSet,用于存储已启用的任务。
然后,遍历enabledSet中的所有任务组(YEnabledTransitionSet.TaskGroup类型)。任务组是一组由单个条件启用的所有任务。对于每个任务组:
a. 如果任务组包含复合任务(YCompositeTask类型),则从任务组中随机选择一个复合任务并触发它。如果该复合任务尚未启用且尚未到达网络的末尾,则调用fireCompositeTask方法来触发复合任务,并将其添加到enabledTasks集合中。
b. 如果任务组包含空任务(YAtomicTask类型),则从任务组中随机选择一个空任务并处理它。如果该空任务尚未启用且尚未到达网络的末尾,则调用processEmptyTask方法来处理空任务。
c. 否则(即任务组中的任务都是原子任务),遍历任务组中的所有原子任务(YAtomicTask类型)。对于每个原子任务,如果该任务尚未启用且尚未到达网络的末尾,则调用fireAtomicTask方法来触发原子任务,并将其添加到enabledTasks集合中。如果fireAtomicTask方法返回一个YAnnouncement对象,则将其添加到_announcements集合中。
如下图,从源码的我们知道YAtomicTask和YCompositeTask继承自YTask类。
private void fireTasks(YEnabledTransitionSet enabledSet, YPersistenceManager pmgr)
throws YDataStateException, YStateException, YQueryException,
YPersistenceException {
Set<YTask> enabledTasks = new HashSet<YTask>();
// A TaskGroup is a group of tasks that are all enabled by a single condition.
// If the group has more than one task, it's a deferred choice, in which case:
// 1. If any are composite, fire one (chosen randomly) - rest are withdrawn
// 2. Else, if any are empty, fire one (chosen randomly) - rest are withdrawn
// 3. Else, fire and announce all enabled atomic tasks to the environment
for (YEnabledTransitionSet.TaskGroup group : enabledSet.getAllTaskGroups()) {
if (group.hasCompositeTasks()) {
YCompositeTask composite = group.getRandomCompositeTaskFromGroup();
if (! (enabledTasks.contains(composite) || endOfNetReached())) {
fireCompositeTask(composite, pmgr);
enabledTasks.add(composite);
}
}
else if (group.hasEmptyTasks()) {
YAtomicTask atomic = group.getRandomEmptyTaskFromGroup();
if (! (enabledTasks.contains(atomic) || endOfNetReached())) {
processEmptyTask(atomic, pmgr);
}
}
else {
String groupID = group.getDeferredChoiceID(); // null if <2 tasks
for (YAtomicTask atomic : group.getAtomicTasks()) {
if (! (enabledTasks.contains(atomic) || endOfNetReached())) {
YAnnouncement announcement = fireAtomicTask(atomic, groupID, pmgr);
if (announcement != null) {
_announcements.add(announcement);
}
enabledTasks.add(atomic) ;
}
}
}
}
}
这里我们看到调用fireCompositeTask和fireAtomicTask方法来处理YCompositeTask对象和YAtomicTask对象。
# YNetRunner类的fireCompositeTask方法
路径:yawl/src/org/yawlfoundation/yawl/engine/YNetRunner.java
private void fireCompositeTask(YCompositeTask task, YPersistenceManager pmgr)
throws YDataStateException, YStateException, YQueryException,
YPersistenceException {
if (! _busyTasks.contains(task)) { // don't proceed if task already started
_busyTasks.add(task);
_busyTaskNames.add(task.getID());
if (pmgr != null) pmgr.updateObject(this);
List<YIdentifier> caseIDs = task.t_fire(pmgr);
for (YIdentifier id : caseIDs) {
try {
task.t_start(pmgr, id);
}
catch (YDataStateException ydse) {
task.rollbackFired(id, pmgr);
ydse.rethrow();
}
}
}
}
路径:yawl/src/org/yawlfoundation/yawl/elements/YTask.java
public synchronized void t_start(YPersistenceManager pmgr, YIdentifier child)
throws YDataStateException, YPersistenceException,
YQueryException, YStateException {
if (t_isBusy()) {
startOne(pmgr, child);
}
}
startOne这里是调用的YCompositeTask的方法,如下。
该方法启动一个复合任务,主要是设置令牌位置:将任务标识符添加到 _mi_executing 集合中(表示任务正在执行),并从 _mi_entered 集合中移除(表示任务已进入但尚未开始执行)。
路径:yawl/src/org/yawlfoundation/yawl/elements/YCompositeTask.java
protected synchronized void startOne(YPersistenceManager pmgr, YIdentifier id)
throws YDataStateException, YPersistenceException, YQueryException, YStateException {
// set token locations
_mi_executing.add(pmgr, id);
_mi_entered.removeOne(pmgr, id);
// create a net runner for this task's contained subnet
YNetRunner netRunner = new YNetRunner(pmgr, (YNet) _decompositionPrototype,
this, id, getData(id));
getNetRunnerRepository().add(netRunner);
logTaskStart(netRunner);
netRunner.continueIfPossible(pmgr);
netRunner.start(pmgr);
}
这两行代码分别表示将任务标识符(id
)从一个集合(_mi_entered
)移动到另一个集合(_mi_executing
)。
_mi_entered
集合表示已进入但尚未开始执行的任务。当一个任务的前置条件满足时,任务会进入这个集合。_mi_executing
集合表示正在执行的任务。当任务开始执行时,它会从_mi_entered
集合移动到_mi_executing
集合。
这个令牌迁移过程反映了任务在工作流实例中的状态变化。当任务从一个状态迁移到另一个状态时,令牌会在相应的集合之间移动。
_mi_executing.add(pmgr, id);
_mi_entered.removeOne(pmgr, id);
# YTask内部条件状态流转
一个YTask任务在被激活以后,从开始运行到结束运行这个过程,YAWL在这中间还定义了一系列的状态流转。理解这个转态的流转生命周期,能加深对YAWL的PetriNet算法整个执行过程。
YTask内部的转态流转支持的状态定义在下面源码里:
路径:yawl/src/org/yawlfoundation/yawl/elements/YTask.java
public abstract class YTask extends YExternalNetElement {
//class members
private static final Random _random = new Random(new Date().getTime());
public static final int _AND = 95;
public static final int _OR = 103;
public static final int _XOR = 126;
//internal state nodes
protected YIdentifier _i;
protected YInternalCondition _mi_active = new YInternalCondition(YInternalCondition._mi_active, this);
protected YInternalCondition _mi_entered = new YInternalCondition(YInternalCondition._mi_entered, this);
protected YInternalCondition _mi_complete = new YInternalCondition(YInternalCondition._mi_complete, this);
protected YInternalCondition _mi_executing = new YInternalCondition(YInternalCondition._mi_executing, this);
...
}
内部条件有以下四种状态,用字符串常量表示:
mi_active
:表示任务实例已经被激活,但可能还没有开始执行。mi_entered
:表示任务实例已经进入,但还没有开始执行。当任务实例的前置条件满足时,它会进入这个状态。mi_executing
:表示任务实例正在执行。当任务实例开始执行时,它会进入这个状态。mi_complete
:表示任务实例已经完成。当任务实例完成所有操作时,它会进入这个状态。
如下是完整的状态流转过程: