Activiti 工作流引擎深度解析
一、项目流程引擎概述
1.1 技术选型
- 引擎版本: Activiti 7.1.0
- 集成方式: Spring Boot + Activiti
- 数据库支持: MySQL (通过 MyBatis-Plus)
- 缓存优化: Redis + Kryo 序列化 (可选配置)
- 分布式锁: Redisson
1.2 核心服务组件
// 1. RepositoryService - 流程定义管理
- 流程部署 (deployment)
- 流程定义查询 (process definition query)
- BPMN 模型读取 (getBpmnModel)
- 资源文件导出 (exportFile)
// 2. RuntimeService - 流程实例运行控制
- 启动流程实例 (startProcessInstanceById)
- 流程实例查询 (createProcessInstanceQuery)
- 流程变量管理
// 3. TaskService - 任务管理
- 任务查询 (createTaskQuery)
- 任务完成 (complete)
- 任务分配 (claim/assign/delegate)
- 候选人管理 (addCandidateUser)
// 4. HistoryService - 历史数据查询
- 历史任务查询 (createHistoricTaskInstanceQuery)
- 历史流程实例查询 (createHistoricProcessInstanceQuery)
二、流程图设计到部署全流程
2.1 流程图设计方式
方式一:XML 直接部署
@PostMapping("/deploymentByXML")
public CommonResponse<String> deploymentByXML(@RequestBody ProcessDefinitionParamsRequest paramsRequest) {
// 1. BPMN XML 转换为 BpmnModel 对象
BpmnModel bpmnModel = WorkFlowUtils.bpmnXmlConverter(paramsRequest.getXml());
// 2. 校验 BPMN 模型合法性
WorkFlowUtils.processValidator(bpmnModel);
// 3. 创建部署
Deployment deploy = repositoryService.createDeployment()
.addString(ActConstant.WK_DEPLOYMENT_RESOURCE_DEFAULT_NAME, paramsRequest.getXml())
.name(paramsRequest.getProjectId() + ":" + paramsRequest.getConfigId())
.deploy();
// 4. 保存业务关联关系
DemandDeploymentDO deploymentDO = new DemandDeploymentDO();
deploymentDO.setDeploymentId(Long.valueOf(deploy.getId()));
deploymentDO.setConfigId(demandConfigDO.getId());
demandDeploymentRepository.save(deploymentDO);
}
方式二:文件上传部署
@PostMapping("/uploadStreamAndDeployment")
public CommonResponse<String> uploadStreamAndDeployment(@RequestParam MultipartFile file) {
InputStream fileInputStream = multipartFile.getInputStream();
String extension = FilenameUtils.getExtension(fileName);
DeploymentBuilder deploymentBuilder = repositoryService.createDeployment();
if ("zip".equals(extension)) {
// ZIP 包部署(包含 bpmn 和 png)
ZipInputStream zip = new ZipInputStream(fileInputStream);
deployment = deploymentBuilder.addZipInputStream(zip).deploy();
} else {
// 单个 BPMN 文件
deployment = deploymentBuilder.addInputStream(fileName, fileInputStream).deploy();
}
}
2.2 BPMN XML 示例结构
<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL">
<process id="requirementFlow" name="需求审批流程">
<!-- 开始节点 -->
<startEvent id="start" name="开始"/>
<!-- 用户任务节点 -->
<userTask id="task1" name="产品审核" activiti:assignee="${productManager}">
<!-- 监听器配置 -->
<extensionElements>
<activiti:listener event="create" class="cn.ufood.pmis.system.workflow.listeners.ProductDemandListeners"/>
<activiti:listener event="complete" class="cn.ufood.pmis.system.workflow.listeners.TaskAutoCommitListener"/>
</extensionElements>
</userTask>
<!-- 排他网关 -->
<exclusiveGateway id="gateway1" name="审批判断"/>
<!-- 顺序流 -->
<sequenceFlow id="flow1" sourceRef="start" targetRef="task1"/>
<sequenceFlow id="flow2" sourceRef="task1" targetRef="gateway1"/>
<!-- 结束节点 -->
<endEvent id="end" name="结束">
<extensionElements>
<activiti:listener class="cn.ufood.pmis.system.workflow.listeners.EndEventBusinessListener"/>
</endEvent>
</endEvent>
</process>
</definitions>
2.3 流程部署关键配置
ActivitiConfiguration 配置类
@Configuration
public class ActivitiConfiguration implements ProcessEngineConfigurationConfigurer {
@Override
public void configure(SpringProcessEngineConfiguration config) {
// 1. 是否自动创建表
config.setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_FALSE);
// 2. 异步执行器开关
config.setAsyncExecutorActivate(false);
// 3. 启用历史记录
config.setDbHistoryUsed(true);
// 4. 历史级别设置(AUDIT=记录审计信息)
config.setHistoryLevel(HistoryLevel.AUDIT);
// 5. 自定义 ID 生成器
config.setIdGenerator(customIdGenerator);
}
}
三、流程实例流转机制
3.1 启动流程实例
@Service
public class ActProcessInstanceBiz implements IActProcessInstanceBiz {
@Resource
private RepositoryService repositoryService;
@Resource
private RuntimeService runtimeService;
public String startProcess(ProcessStartParamsRequest paramsRequest) {
// 1. 查询业务数据
PmisRequirementSortDO sortDO = requirementSortRepository.getById(paramsRequest.getRequirementId());
// 2. 查找对应的流程部署
DemandDeploymentDO deploymentDO = demandDeploymentRepository.getOne(
new LambdaQueryWrapper<DemandDeploymentDO>()
.eq(DemandDeploymentDO::getConfigId, sortDO.getConfigId())
.orderByDesc(GeneralDO::getCreateTime)
.last(" limit 1")
);
// 3. 获取最新版本的流程定义
ProcessDefinition processDefinition = repositoryService
.createProcessDefinitionQuery()
.deploymentId(deploymentDO.getDeploymentId().toString())
.singleResult();
// 4. 启动流程实例(businessKey = 业务主键)
ProcessInstance instance = runtimeService.startProcessInstanceById(
processDefinition.getId(),
sortDO.getId().toString() // businessKey
);
return instance.getId();
}
}
3.2 任务处理流程
完成任务
@Override
@Transactional(rollbackFor = Exception.class)
public void completeTask(TaskDisposeParamsRequest paramsRequest, Boolean isRollback) {
// 1. 查询任务
Task task = taskService.createTaskQuery()
.taskId(paramsRequest.getTaskId())
.singleResult();
if (Objects.isNull(task)) {
throw new BusinessException("任务不存在或您不是当前审批人");
}
if (task.isSuspended()) {
throw new BusinessException("当前任务已被挂起");
}
// 2. 执行业务逻辑验证
// 3. 提交任务(流程自动流转到下一节点)
taskService.complete(task.getId());
}
回滚任务(退回上一个节点)
@Override
@Transactional(rollbackFor = Exception.class)
public void backTask(TaskDisposeParamsRequest paramsRequest) {
String processInstanceId = paramsRequest.getInstanceId();
// 1. 获取所有历史任务(按创建时间升序)
List<HistoricTaskInstance> hisTaskList = historyService
.createHistoricTaskInstanceQuery()
.processInstanceId(processInstanceId)
.orderByTaskCreateTime()
.asc()
.list();
// 2. 获取第一个任务和当前任务
HistoricTaskInstance startTask = hisTaskList.get(0);
HistoricTaskInstance currentTask = hisTaskList.get(hisTaskList.size() - 1);
// 3. 获取 BPMN 模型
BpmnModel bpmnModel = repositoryService.getBpmnModel(currentTask.getProcessDefinitionId());
// 4. 获取活动节点
FlowNode startFlowNode = (FlowNode) bpmnModel.getMainProcess()
.getFlowElement(startTask.getTaskDefinitionKey());
FlowNode currentFlowNode = (FlowNode) bpmnModel.getMainProcess()
.getFlowElement(currentTask.getTaskDefinitionKey());
// 5. 保存原始流向
List<SequenceFlow> originalSequenceFlowList = new ArrayList<>();
originalSequenceFlowList.addAll(currentFlowNode.getOutgoingFlows());
// 6. 清空并设置新流向(指向开始节点)
currentFlowNode.getOutgoingFlows().clear();
SequenceFlow newSequenceFlow = new SequenceFlow();
newSequenceFlow.setId("newSequenceFlowId");
newSequenceFlow.setSourceFlowElement(currentFlowNode);
newSequenceFlow.setTargetFlowElement(startFlowNode);
currentFlowNode.setOutgoingFlows(new ArrayList<>(Arrays.asList(newSequenceFlow)));
// 7. 完成当前任务(流程会沿着新流向回到开始节点)
taskService.complete(currentTask.getId());
// 8. 重新设置开始任务的办理人
Task nextTask = taskService.createTaskQuery()
.processInstanceId(processInstanceId)
.singleResult();
if (nextTask != null) {
taskService.setAssignee(nextTask.getId(), startTask.getAssignee());
}
// 9. 恢复原始流向
currentFlowNode.setOutgoingFlows(originalSequenceFlowList);
}
四、任务分配策略
4.1 任务分配方式对比
| 方式 | 方法 | 说明 | 使用场景 |
|---|---|---|---|
| 指派 | taskService.claim(taskId, userId) |
领取任务,成为办理人 | 多人候选池中的任务领取 |
| 转办 | taskService.setOwner(taskId, userId) |
转移任务所有权 | A 将任务完全交给 B 处理 |
| 委派 | taskService.delegateTask(taskId, userId) |
临时委托,完成后返回 | A 临时委托 B 处理,最终回到 A |
| 完成委派 | taskService.resolveTask(taskId) |
被委派人完成任务 | B 完成委派任务,回到 A |
4.2 添加候选人
@Override
public void addCandidateUser(TaskParamsRequest paramsRequest) {
String key = "addCandidateUser:" + paramsRequest.getTaskId();
// 分布式锁防止重复操作
if (redisProvider.exists(key)) {
throw new BusinessException("请勿重复操作");
}
redisProvider.set(key, key, GlobalManager.LOCK_OUT_TIME);
try {
// 为任务添加候选人(可以多人)
for (Long userId : paramsRequest.getUserIds()) {
taskService.addCandidateUser(paramsRequest.getTaskId(), userId.toString());
}
} finally {
redisProvider.delete(key);
}
}
五、监听器机制详解
5.1 监听器类型
TaskListener - 任务监听器
监听任务的生命周期事件:
- create: 任务创建时触发
- assignment: 任务指派时触发
- complete: 任务完成时触发
- delete: 任务删除时触发
ExecutionListener - 执行监听器
监听流程执行过程的事件:
- start: 节点开始执行
- end: 节点执行结束
- take: 顺序流被选取时
5.2 监听器实现示例
示例 1:任务创建监听器(自动初始化数据)
@Component("WorkCreateListener")
public class WorkCreateListener implements TaskListener {
@Override
public void notify(DelegateTask delegateTask) {
// 获取流程实例 ID
String processInstanceId = delegateTask.getProcessInstanceId();
// 获取业务数据
PmisRequirementApplyDO applyDO = requirementApplyRepository.getOne(
new LambdaQueryWrapper<PmisRequirementApplyDO>()
.eq(PmisRequirementApplyDO::getInstanceId, processInstanceId)
);
// 初始化任务相关数据
// ...
}
}
示例 2:任务自动提交监听器
@Component("TaskAutoCommitListener")
public class TaskAutoCommitListener implements TaskListener {
@Override
public void notify(DelegateTask delegateTask) {
// 任务创建时自动完成(用于无需人工干预的节点)
taskService.complete(delegateTask.getId());
}
}
示例 3:结束事件监听器
@Component("EndEventBusinessListener")
public class EndEventBusinessListener implements ExecutionListener {
@Override
public void notify(DelegateExecution execution) {
// 流程结束时执行业务逻辑
String processInstanceId = execution.getProcessInstanceId();
// 更新业务状态
// 发送通知消息
// 记录审计日志
}
}
5.3 BPMN 中配置监听器
<userTask id="task1" name="产品审核">
<extensionElements>
<!-- 任务创建监听器 -->
<activiti:listener event="create" class="cn.ufood.WorkCreateListener"/>
<!-- 任务完成监听器 -->
<activiti:listener event="complete" class="cn.ufood.TaskAutoCommitListener"/>
</extensionElements>
</userTask>
<endEvent id="end">
<extensionElements>
<!-- 结束事件监听器 -->
<activiti:listener class="cn.ufood.EndEventBusinessListener"/>
</extensionElements>
</endEvent>
六、高级特性
6.1 流程定义状态管理
@Override
@Transactional(rollbackFor = Exception.class)
public Boolean updateProcDefState(Map<String, Object> data) {
String definitionId = data.get("definitionId").toString();
String description = data.get("description").toString();
// 更新原因记录
definitionMapper.updateDescriptionById(definitionId, description);
ProcessDefinition processDefinition = repositoryService
.createProcessDefinitionQuery()
.processDefinitionId(definitionId)
.singleResult();
if (processDefinition.isSuspended()) {
// 挂起 -> 激活
// 参数 2: 是否级联(影响该定义的所有实例)
// 参数 3: 激活时间(null=立即)
repositoryService.activateProcessDefinitionById(
definitionId, true, null
);
} else {
// 激活 -> 挂起
repositoryService.suspendProcessDefinitionById(
definitionId, true, null
);
}
return true;
}
6.2 流程图生成与导出
@GetMapping("/getDeploymentBpmn")
public CommonResponse<String> getDeploymentBpmn(String configId, HttpServletResponse response) {
// 1. 获取部署信息
DemandDeploymentDO deploymentDO = demandDeploymentRepository.getOne(...);
// 2. 获取 BpmnModel
ProcessDefinition processDefinition = repositoryService
.createProcessDefinitionQuery()
.deploymentId(deploymentDO.getDeploymentId().toString())
.singleResult();
BpmnModel bpmnModel = repositoryService.getBpmnModel(processDefinition.getId());
// 3. 生成流程图图片
DefaultProcessDiagramGenerator diagramGenerator = new DefaultProcessDiagramGenerator();
// 根据操作系统设置字体(解决中文乱码)
String font = System.getProperty("os.name").toLowerCase().startsWith("win") ? "宋体" : "SimSun";
try (InputStream in = diagramGenerator.generateDiagram(
bpmnModel,
"jpg",
new ArrayList<>(), // 高亮已完成的节点
new ArrayList<>(), // 高亮正在进行的活动
font, font, font, // 分别设置活动、任务、标签字体
this.getClass().getClassLoader(),
1.0 // 缩放比例
)) {
BufferedImage image = ImageIO.read(in);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ImageIO.write(image, "jpg", byteArrayOutputStream);
String base64String = Base64.getEncoder()
.encodeToString(byteArrayOutputStream.toByteArray());
return CommonResponse.success("data:image/JPG;base64," + base64String);
}
}
6.3 流程复制功能
@Override
@Transactional(rollbackFor = Exception.class)
public CommonResponse<String> copyProcessDefinition(Long configId) {
// 1. 复制公共配置
PublicDemandConfigDO byId = publicDemandConfigRepository.getById(configId);
byId.setId(null);
byId.setName("复制-" + byId.getName());
publicDemandConfigRepository.save(byId);
Long newConfigId = byId.getId();
// 2. 获取原部署的 BPMN
DemandDeploymentDO deploymentDO = demandDeploymentRepository.lambdaQuery()
.eq(DemandDeploymentDO::getConfigId, configId)
.orderByDesc(GeneralDO::getCreateTime)
.last("limit 1").one();
ProcessDefinition processDefinition = repositoryService
.createProcessDefinitionQuery()
.deploymentId(deploymentDO.getDeploymentId().toString())
.list().stream().findFirst()
.orElseThrow(() -> new BusinessException("未找到流程"));
InputStream model = repositoryService.getProcessModel(processDefinition.getId());
// 3. 重新部署流程
Deployment deployment = repositoryService.createDeployment()
.addInputStream(ActConstant.WK_DEPLOYMENT_RESOURCE_DEFAULT_NAME, model)
.name(byId.getProjectId() + ":" + newConfigId)
.deploy();
// 4. 复制按钮配置和表单配置
// ...(详细代码略)
return CommonResponse.success("复制成功");
}
七、并发控制与性能优化
7.1 分布式锁保护关键操作
@PostMapping("/deploymentByXML")
public CommonResponse<String> deploymentByXML(@RequestBody ProcessDefinitionParamsRequest paramsRequest) {
// 使用 Redisson 分布式锁防止并发部署
return redissonLockUtil.lock(
5L, // 等待锁时间(秒)
60L, // 锁持有时间(秒)
TimeUnit.SECONDS,
() -> processDefinitionService.deploymentByXML(paramsRequest),
() -> CommonResponse.error("部署失败!加锁失败"),
WorkFlowUtils.lockKey("deployment") // 锁的 key
);
}
7.2 防重复提交控制
@PostMapping("/pmisCompleteTask")
public CommonResponse pmisCompleteTask(@RequestBody TaskParamsRequest paramsRequest) {
String key = "pmisCompleteTask:" + paramsRequest.getTaskId();
// Redis 防重复提交
if (redisProvider.exists(key)) {
throw new BusinessException("请勿重复操作");
}
redisProvider.set(key, key, GlobalManager.LOCK_OUT_TIME);
try {
activitiTaskService.pmisCompleteTask(paramsRequest, false);
} finally {
redisProvider.delete(key);
}
return CommonResponse.success("提交成功");
}
7.3 线程池配置
@Bean("workFlowTaskExecutor")
public ThreadPoolTaskExecutor workFlowAsync() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10); // 核心线程数
executor.setMaxPoolSize(30); // 最大线程数
executor.setKeepAliveSeconds(1); // 空闲线程存活时间
executor.setQueueCapacity(300); // 队列容量
executor.setThreadNamePrefix("workFlowTaskExecutor-");
executor.setAllowCoreThreadTimeOut(true);
executor.initialize();
return executor;
}
八、最佳实践总结
8.1 流程设计原则
- 单一职责: 每个流程只负责一个完整的业务流程
- 版本管理: 流程修改后生成新版本,保留历史版本
- 监听器轻量化: 监听器只做简单数据处理,复杂业务异步处理
- 错误处理: 在监听器中做好异常捕获和日志记录
8.2 开发注意事项
- 事务控制: 流程操作必须加
@Transactional保证数据一致性 - 并发安全: 任务处理、部署等关键操作需要加分布式锁
- 权限验证: 任务处理前必须验证当前用户是否为办理人
- 历史数据: 合理设置历史级别(推荐 AUDIT),避免数据过大
8.3 性能优化建议
- 流程定义缓存: 使用 Redis 缓存流程定义,减少数据库查询
- 异步执行器: 开启异步执行器处理定时任务和回调
- 分页查询: 任务列表、历史数据查询必须分页
- 索引优化: 对常用查询字段建立索引(如 businessKey)
九、学习路线建议
9.1 入门阶段
- 了解 BPMN 2.0 规范基础(节点、网关、顺序流)
- 学习 Activiti 核心 API(RepositoryService/RuntimeService/TaskService)
- 掌握流程部署和实例启动的基本操作
9.2 进阶阶段
- 深入学习监听器机制(TaskListener/ExecutionListener)
- 掌握任务分配策略(指派/转办/委派)
- 理解流程变量和表达式(UEL 表达式)
9.3 高级阶段
- 研究流程引擎源码和数据库表结构
- 掌握流程引擎性能优化和调优
- 学习复杂场景处理(驳回/会签/或签/自由流)
十、核心数据库表
Activiti 默认使用以下表前缀(ACT_):
| 表前缀 | 说明 | 主要表 |
|---|---|---|
| ACT_RE | Repository - 流程定义和部署 | ACT_RE_PROCDEF(流程定义), ACT_RE_DEPLOYMENT(部署) |
| ACT_RU | Runtime - 运行时数据 | ACT_RU_TASK(任务), ACT_RU_EXECUTION(执行) |
| ACT_HI | History - 历史数据 | ACT_HI_TASKINST(历史任务), ACT_HI_PROCINST(历史实例) |
| ACT_ID | Identity - 用户和组 | ACT_ID_USER(用户), ACT_ID_GROUP(组) |
| ACT_GE | General - 通用数据 | ACT_GE_BYTEARRAY(二进制数据), ACT_GE_PROPERTY(属性) |
文档版本: v1.0
编写日期: 2026 年 3 月 18 日
适用项目: pmis-backend-system