Administrator
发布于 2026-03-19 / 1 阅读
0
0

Activiti 工作流引擎深度解析

Activiti 工作流引擎深度解析

一、项目流程引擎概述

1.1 技术选型

  • 引擎版本: Activiti 7.1.0
  • 集成方式: Spring Boot + Activiti
  • 数据库支持: MySQL (通过 MyBatis-Plus)
  • 缓存优化: Redis + Kryo 序列化 (可选配置)
  • 分布式锁: Redisson

1.2 核心服务组件

// 1. RepositoryService - 流程定义管理
- 流程部署 (deployment)
- 流程定义查询 (process definition query)
- BPMN 模型读取 (getBpmnModel)
- 资源文件导出 (exportFile)

// 2. RuntimeService - 流程实例运行控制
- 启动流程实例 (startProcessInstanceById)
- 流程实例查询 (createProcessInstanceQuery)
- 流程变量管理

// 3. TaskService - 任务管理
- 任务查询 (createTaskQuery)
- 任务完成 (complete)
- 任务分配 (claim/assign/delegate)
- 候选人管理 (addCandidateUser)

// 4. HistoryService - 历史数据查询
- 历史任务查询 (createHistoricTaskInstanceQuery)
- 历史流程实例查询 (createHistoricProcessInstanceQuery)

二、流程图设计到部署全流程

2.1 流程图设计方式

方式一:XML 直接部署

@PostMapping("/deploymentByXML")
public CommonResponse<String> deploymentByXML(@RequestBody ProcessDefinitionParamsRequest paramsRequest) {
    // 1. BPMN XML 转换为 BpmnModel 对象
    BpmnModel bpmnModel = WorkFlowUtils.bpmnXmlConverter(paramsRequest.getXml());
    
    // 2. 校验 BPMN 模型合法性
    WorkFlowUtils.processValidator(bpmnModel);
    
    // 3. 创建部署
    Deployment deploy = repositoryService.createDeployment()
        .addString(ActConstant.WK_DEPLOYMENT_RESOURCE_DEFAULT_NAME, paramsRequest.getXml())
        .name(paramsRequest.getProjectId() + ":" + paramsRequest.getConfigId())
        .deploy();
    
    // 4. 保存业务关联关系
    DemandDeploymentDO deploymentDO = new DemandDeploymentDO();
    deploymentDO.setDeploymentId(Long.valueOf(deploy.getId()));
    deploymentDO.setConfigId(demandConfigDO.getId());
    demandDeploymentRepository.save(deploymentDO);
}

方式二:文件上传部署

@PostMapping("/uploadStreamAndDeployment")
public CommonResponse<String> uploadStreamAndDeployment(@RequestParam MultipartFile file) {
    InputStream fileInputStream = multipartFile.getInputStream();
    String extension = FilenameUtils.getExtension(fileName);
    
    DeploymentBuilder deploymentBuilder = repositoryService.createDeployment();
    
    if ("zip".equals(extension)) {
        // ZIP 包部署(包含 bpmn 和 png)
        ZipInputStream zip = new ZipInputStream(fileInputStream);
        deployment = deploymentBuilder.addZipInputStream(zip).deploy();
    } else {
        // 单个 BPMN 文件
        deployment = deploymentBuilder.addInputStream(fileName, fileInputStream).deploy();
    }
}

2.2 BPMN XML 示例结构

<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL">
  <process id="requirementFlow" name="需求审批流程">
    
    <!-- 开始节点 -->
    <startEvent id="start" name="开始"/>
    
    <!-- 用户任务节点 -->
    <userTask id="task1" name="产品审核" activiti:assignee="${productManager}">
      <!-- 监听器配置 -->
      <extensionElements>
        <activiti:listener event="create" class="cn.ufood.pmis.system.workflow.listeners.ProductDemandListeners"/>
        <activiti:listener event="complete" class="cn.ufood.pmis.system.workflow.listeners.TaskAutoCommitListener"/>
      </extensionElements>
    </userTask>
    
    <!-- 排他网关 -->
    <exclusiveGateway id="gateway1" name="审批判断"/>
    
    <!-- 顺序流 -->
    <sequenceFlow id="flow1" sourceRef="start" targetRef="task1"/>
    <sequenceFlow id="flow2" sourceRef="task1" targetRef="gateway1"/>
    
    <!-- 结束节点 -->
    <endEvent id="end" name="结束">
      <extensionElements>
        <activiti:listener class="cn.ufood.pmis.system.workflow.listeners.EndEventBusinessListener"/>
      </endEvent>
    </endEvent>
    
  </process>
</definitions>

2.3 流程部署关键配置

ActivitiConfiguration 配置类

@Configuration
public class ActivitiConfiguration implements ProcessEngineConfigurationConfigurer {
    
    @Override
    public void configure(SpringProcessEngineConfiguration config) {
        // 1. 是否自动创建表
        config.setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_FALSE);
        
        // 2. 异步执行器开关
        config.setAsyncExecutorActivate(false);
        
        // 3. 启用历史记录
        config.setDbHistoryUsed(true);
        
        // 4. 历史级别设置(AUDIT=记录审计信息)
        config.setHistoryLevel(HistoryLevel.AUDIT);
        
        // 5. 自定义 ID 生成器
        config.setIdGenerator(customIdGenerator);
    }
}

三、流程实例流转机制

3.1 启动流程实例

@Service
public class ActProcessInstanceBiz implements IActProcessInstanceBiz {
    
    @Resource
    private RepositoryService repositoryService;
    
    @Resource
    private RuntimeService runtimeService;
    
    public String startProcess(ProcessStartParamsRequest paramsRequest) {
        // 1. 查询业务数据
        PmisRequirementSortDO sortDO = requirementSortRepository.getById(paramsRequest.getRequirementId());
        
        // 2. 查找对应的流程部署
        DemandDeploymentDO deploymentDO = demandDeploymentRepository.getOne(
            new LambdaQueryWrapper<DemandDeploymentDO>()
                .eq(DemandDeploymentDO::getConfigId, sortDO.getConfigId())
                .orderByDesc(GeneralDO::getCreateTime)
                .last(" limit 1")
        );
        
        // 3. 获取最新版本的流程定义
        ProcessDefinition processDefinition = repositoryService
            .createProcessDefinitionQuery()
            .deploymentId(deploymentDO.getDeploymentId().toString())
            .singleResult();
        
        // 4. 启动流程实例(businessKey = 业务主键)
        ProcessInstance instance = runtimeService.startProcessInstanceById(
            processDefinition.getId(), 
            sortDO.getId().toString()  // businessKey
        );
        
        return instance.getId();
    }
}

3.2 任务处理流程

完成任务

@Override
@Transactional(rollbackFor = Exception.class)
public void completeTask(TaskDisposeParamsRequest paramsRequest, Boolean isRollback) {
    // 1. 查询任务
    Task task = taskService.createTaskQuery()
        .taskId(paramsRequest.getTaskId())
        .singleResult();
    
    if (Objects.isNull(task)) {
        throw new BusinessException("任务不存在或您不是当前审批人");
    }
    
    if (task.isSuspended()) {
        throw new BusinessException("当前任务已被挂起");
    }
    
    // 2. 执行业务逻辑验证
    
    // 3. 提交任务(流程自动流转到下一节点)
    taskService.complete(task.getId());
}

回滚任务(退回上一个节点)

@Override
@Transactional(rollbackFor = Exception.class)
public void backTask(TaskDisposeParamsRequest paramsRequest) {
    String processInstanceId = paramsRequest.getInstanceId();
    
    // 1. 获取所有历史任务(按创建时间升序)
    List<HistoricTaskInstance> hisTaskList = historyService
        .createHistoricTaskInstanceQuery()
        .processInstanceId(processInstanceId)
        .orderByTaskCreateTime()
        .asc()
        .list();
    
    // 2. 获取第一个任务和当前任务
    HistoricTaskInstance startTask = hisTaskList.get(0);
    HistoricTaskInstance currentTask = hisTaskList.get(hisTaskList.size() - 1);
    
    // 3. 获取 BPMN 模型
    BpmnModel bpmnModel = repositoryService.getBpmnModel(currentTask.getProcessDefinitionId());
    
    // 4. 获取活动节点
    FlowNode startFlowNode = (FlowNode) bpmnModel.getMainProcess()
        .getFlowElement(startTask.getTaskDefinitionKey());
    FlowNode currentFlowNode = (FlowNode) bpmnModel.getMainProcess()
        .getFlowElement(currentTask.getTaskDefinitionKey());
    
    // 5. 保存原始流向
    List<SequenceFlow> originalSequenceFlowList = new ArrayList<>();
    originalSequenceFlowList.addAll(currentFlowNode.getOutgoingFlows());
    
    // 6. 清空并设置新流向(指向开始节点)
    currentFlowNode.getOutgoingFlows().clear();
    SequenceFlow newSequenceFlow = new SequenceFlow();
    newSequenceFlow.setId("newSequenceFlowId");
    newSequenceFlow.setSourceFlowElement(currentFlowNode);
    newSequenceFlow.setTargetFlowElement(startFlowNode);
    currentFlowNode.setOutgoingFlows(new ArrayList<>(Arrays.asList(newSequenceFlow)));
    
    // 7. 完成当前任务(流程会沿着新流向回到开始节点)
    taskService.complete(currentTask.getId());
    
    // 8. 重新设置开始任务的办理人
    Task nextTask = taskService.createTaskQuery()
        .processInstanceId(processInstanceId)
        .singleResult();
    if (nextTask != null) {
        taskService.setAssignee(nextTask.getId(), startTask.getAssignee());
    }
    
    // 9. 恢复原始流向
    currentFlowNode.setOutgoingFlows(originalSequenceFlowList);
}

四、任务分配策略

4.1 任务分配方式对比

方式 方法 说明 使用场景
指派 taskService.claim(taskId, userId) 领取任务,成为办理人 多人候选池中的任务领取
转办 taskService.setOwner(taskId, userId) 转移任务所有权 A 将任务完全交给 B 处理
委派 taskService.delegateTask(taskId, userId) 临时委托,完成后返回 A 临时委托 B 处理,最终回到 A
完成委派 taskService.resolveTask(taskId) 被委派人完成任务 B 完成委派任务,回到 A

4.2 添加候选人

@Override
public void addCandidateUser(TaskParamsRequest paramsRequest) {
    String key = "addCandidateUser:" + paramsRequest.getTaskId();
    
    // 分布式锁防止重复操作
    if (redisProvider.exists(key)) {
        throw new BusinessException("请勿重复操作");
    }
    
    redisProvider.set(key, key, GlobalManager.LOCK_OUT_TIME);
    try {
        // 为任务添加候选人(可以多人)
        for (Long userId : paramsRequest.getUserIds()) {
            taskService.addCandidateUser(paramsRequest.getTaskId(), userId.toString());
        }
    } finally {
        redisProvider.delete(key);
    }
}

五、监听器机制详解

5.1 监听器类型

TaskListener - 任务监听器

监听任务的生命周期事件:

  • create: 任务创建时触发
  • assignment: 任务指派时触发
  • complete: 任务完成时触发
  • delete: 任务删除时触发

ExecutionListener - 执行监听器

监听流程执行过程的事件:

  • start: 节点开始执行
  • end: 节点执行结束
  • take: 顺序流被选取时

5.2 监听器实现示例

示例 1:任务创建监听器(自动初始化数据)

@Component("WorkCreateListener")
public class WorkCreateListener implements TaskListener {
    
    @Override
    public void notify(DelegateTask delegateTask) {
        // 获取流程实例 ID
        String processInstanceId = delegateTask.getProcessInstanceId();
        
        // 获取业务数据
        PmisRequirementApplyDO applyDO = requirementApplyRepository.getOne(
            new LambdaQueryWrapper<PmisRequirementApplyDO>()
                .eq(PmisRequirementApplyDO::getInstanceId, processInstanceId)
        );
        
        // 初始化任务相关数据
        // ...
    }
}

示例 2:任务自动提交监听器

@Component("TaskAutoCommitListener")
public class TaskAutoCommitListener implements TaskListener {
    
    @Override
    public void notify(DelegateTask delegateTask) {
        // 任务创建时自动完成(用于无需人工干预的节点)
        taskService.complete(delegateTask.getId());
    }
}

示例 3:结束事件监听器

@Component("EndEventBusinessListener")
public class EndEventBusinessListener implements ExecutionListener {
    
    @Override
    public void notify(DelegateExecution execution) {
        // 流程结束时执行业务逻辑
        String processInstanceId = execution.getProcessInstanceId();
        
        // 更新业务状态
        // 发送通知消息
        // 记录审计日志
    }
}

5.3 BPMN 中配置监听器

<userTask id="task1" name="产品审核">
  <extensionElements>
    <!-- 任务创建监听器 -->
    <activiti:listener event="create" class="cn.ufood.WorkCreateListener"/>
    
    <!-- 任务完成监听器 -->
    <activiti:listener event="complete" class="cn.ufood.TaskAutoCommitListener"/>
  </extensionElements>
</userTask>

<endEvent id="end">
  <extensionElements>
    <!-- 结束事件监听器 -->
    <activiti:listener class="cn.ufood.EndEventBusinessListener"/>
  </extensionElements>
</endEvent>

六、高级特性

6.1 流程定义状态管理

@Override
@Transactional(rollbackFor = Exception.class)
public Boolean updateProcDefState(Map<String, Object> data) {
    String definitionId = data.get("definitionId").toString();
    String description = data.get("description").toString();
    
    // 更新原因记录
    definitionMapper.updateDescriptionById(definitionId, description);
    
    ProcessDefinition processDefinition = repositoryService
        .createProcessDefinitionQuery()
        .processDefinitionId(definitionId)
        .singleResult();
    
    if (processDefinition.isSuspended()) {
        // 挂起 -> 激活
        // 参数 2: 是否级联(影响该定义的所有实例)
        // 参数 3: 激活时间(null=立即)
        repositoryService.activateProcessDefinitionById(
            definitionId, true, null
        );
    } else {
        // 激活 -> 挂起
        repositoryService.suspendProcessDefinitionById(
            definitionId, true, null
        );
    }
    
    return true;
}

6.2 流程图生成与导出

@GetMapping("/getDeploymentBpmn")
public CommonResponse<String> getDeploymentBpmn(String configId, HttpServletResponse response) {
    // 1. 获取部署信息
    DemandDeploymentDO deploymentDO = demandDeploymentRepository.getOne(...);
    
    // 2. 获取 BpmnModel
    ProcessDefinition processDefinition = repositoryService
        .createProcessDefinitionQuery()
        .deploymentId(deploymentDO.getDeploymentId().toString())
        .singleResult();
    
    BpmnModel bpmnModel = repositoryService.getBpmnModel(processDefinition.getId());
    
    // 3. 生成流程图图片
    DefaultProcessDiagramGenerator diagramGenerator = new DefaultProcessDiagramGenerator();
    
    // 根据操作系统设置字体(解决中文乱码)
    String font = System.getProperty("os.name").toLowerCase().startsWith("win") ? "宋体" : "SimSun";
    
    try (InputStream in = diagramGenerator.generateDiagram(
            bpmnModel, 
            "jpg",
            new ArrayList<>(),  // 高亮已完成的节点
            new ArrayList<>(),  // 高亮正在进行的活动
            font, font, font,   // 分别设置活动、任务、标签字体
            this.getClass().getClassLoader(), 
            1.0                 // 缩放比例
    )) {
        BufferedImage image = ImageIO.read(in);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        ImageIO.write(image, "jpg", byteArrayOutputStream);
        String base64String = Base64.getEncoder()
            .encodeToString(byteArrayOutputStream.toByteArray());
        return CommonResponse.success("data:image/JPG;base64," + base64String);
    }
}

6.3 流程复制功能

@Override
@Transactional(rollbackFor = Exception.class)
public CommonResponse<String> copyProcessDefinition(Long configId) {
    // 1. 复制公共配置
    PublicDemandConfigDO byId = publicDemandConfigRepository.getById(configId);
    byId.setId(null);
    byId.setName("复制-" + byId.getName());
    publicDemandConfigRepository.save(byId);
    Long newConfigId = byId.getId();
    
    // 2. 获取原部署的 BPMN
    DemandDeploymentDO deploymentDO = demandDeploymentRepository.lambdaQuery()
        .eq(DemandDeploymentDO::getConfigId, configId)
        .orderByDesc(GeneralDO::getCreateTime)
        .last("limit 1").one();
    
    ProcessDefinition processDefinition = repositoryService
        .createProcessDefinitionQuery()
        .deploymentId(deploymentDO.getDeploymentId().toString())
        .list().stream().findFirst()
        .orElseThrow(() -> new BusinessException("未找到流程"));
    
    InputStream model = repositoryService.getProcessModel(processDefinition.getId());
    
    // 3. 重新部署流程
    Deployment deployment = repositoryService.createDeployment()
        .addInputStream(ActConstant.WK_DEPLOYMENT_RESOURCE_DEFAULT_NAME, model)
        .name(byId.getProjectId() + ":" + newConfigId)
        .deploy();
    
    // 4. 复制按钮配置和表单配置
    // ...(详细代码略)
    
    return CommonResponse.success("复制成功");
}

七、并发控制与性能优化

7.1 分布式锁保护关键操作

@PostMapping("/deploymentByXML")
public CommonResponse<String> deploymentByXML(@RequestBody ProcessDefinitionParamsRequest paramsRequest) {
    // 使用 Redisson 分布式锁防止并发部署
    return redissonLockUtil.lock(
        5L,                    // 等待锁时间(秒)
        60L,                   // 锁持有时间(秒)
        TimeUnit.SECONDS,
        () -> processDefinitionService.deploymentByXML(paramsRequest),
        () -> CommonResponse.error("部署失败!加锁失败"),
        WorkFlowUtils.lockKey("deployment")  // 锁的 key
    );
}

7.2 防重复提交控制

@PostMapping("/pmisCompleteTask")
public CommonResponse pmisCompleteTask(@RequestBody TaskParamsRequest paramsRequest) {
    String key = "pmisCompleteTask:" + paramsRequest.getTaskId();
    
    // Redis 防重复提交
    if (redisProvider.exists(key)) {
        throw new BusinessException("请勿重复操作");
    }
    
    redisProvider.set(key, key, GlobalManager.LOCK_OUT_TIME);
    try {
        activitiTaskService.pmisCompleteTask(paramsRequest, false);
    } finally {
        redisProvider.delete(key);
    }
    
    return CommonResponse.success("提交成功");
}

7.3 线程池配置

@Bean("workFlowTaskExecutor")
public ThreadPoolTaskExecutor workFlowAsync() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(10);      // 核心线程数
    executor.setMaxPoolSize(30);       // 最大线程数
    executor.setKeepAliveSeconds(1);   // 空闲线程存活时间
    executor.setQueueCapacity(300);    // 队列容量
    executor.setThreadNamePrefix("workFlowTaskExecutor-");
    executor.setAllowCoreThreadTimeOut(true);
    executor.initialize();
    return executor;
}

八、最佳实践总结

8.1 流程设计原则

  1. 单一职责: 每个流程只负责一个完整的业务流程
  2. 版本管理: 流程修改后生成新版本,保留历史版本
  3. 监听器轻量化: 监听器只做简单数据处理,复杂业务异步处理
  4. 错误处理: 在监听器中做好异常捕获和日志记录

8.2 开发注意事项

  1. 事务控制: 流程操作必须加 @Transactional 保证数据一致性
  2. 并发安全: 任务处理、部署等关键操作需要加分布式锁
  3. 权限验证: 任务处理前必须验证当前用户是否为办理人
  4. 历史数据: 合理设置历史级别(推荐 AUDIT),避免数据过大

8.3 性能优化建议

  1. 流程定义缓存: 使用 Redis 缓存流程定义,减少数据库查询
  2. 异步执行器: 开启异步执行器处理定时任务和回调
  3. 分页查询: 任务列表、历史数据查询必须分页
  4. 索引优化: 对常用查询字段建立索引(如 businessKey)

九、学习路线建议

9.1 入门阶段

  1. 了解 BPMN 2.0 规范基础(节点、网关、顺序流)
  2. 学习 Activiti 核心 API(RepositoryService/RuntimeService/TaskService)
  3. 掌握流程部署和实例启动的基本操作

9.2 进阶阶段

  1. 深入学习监听器机制(TaskListener/ExecutionListener)
  2. 掌握任务分配策略(指派/转办/委派)
  3. 理解流程变量和表达式(UEL 表达式)

9.3 高级阶段

  1. 研究流程引擎源码和数据库表结构
  2. 掌握流程引擎性能优化和调优
  3. 学习复杂场景处理(驳回/会签/或签/自由流)

十、核心数据库表

Activiti 默认使用以下表前缀(ACT_):

表前缀 说明 主要表
ACT_RE Repository - 流程定义和部署 ACT_RE_PROCDEF(流程定义), ACT_RE_DEPLOYMENT(部署)
ACT_RU Runtime - 运行时数据 ACT_RU_TASK(任务), ACT_RU_EXECUTION(执行)
ACT_HI History - 历史数据 ACT_HI_TASKINST(历史任务), ACT_HI_PROCINST(历史实例)
ACT_ID Identity - 用户和组 ACT_ID_USER(用户), ACT_ID_GROUP(组)
ACT_GE General - 通用数据 ACT_GE_BYTEARRAY(二进制数据), ACT_GE_PROPERTY(属性)

文档版本: v1.0
编写日期: 2026 年 3 月 18 日
适用项目: pmis-backend-system


评论