RocketMQ 实战指南 - 订单超时自动取消
一、环境准备
1.1 RocketMQ 安装
# 下载 RocketMQ
wget https://archive.apache.org/dist/rocketmq/5.0.0/rocketmq-all-5.0.0-bin-release.zip
# 解压
unzip rocketmq-all-5.0.0-bin-release.zip
cd rocketmq-all-5.0.0-bin-release
# 启动 NameServer(Windows 使用 runserver.cmd)
nohup sh bin/mqnamesrv &
# 启动 Broker(Windows 使用 runbroker.cmd)
nohup sh bin/mqbroker -n localhost:9876 &
# 启动控制台(可选)
docker run -d --name rocketmq-console \
-p 8080:8080 \
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=localhost:9876" \
apache/rocketmq-dashboard
1.2 Maven 依赖
在业务模块的 pom.xml 中添加:
<dependency>
<groupId>com.mars</groupId>
<artifactId>mars-rocketmq</artifactId>
<version>1.0.0</version>
</dependency>
二、配置文件
2.1 application.yml 配置
rocketmq:
# NameServer 地址(生产环境配置多个,用分号分隔)
name-server: 127.0.0.1:9876
# 生产者配置
producer:
group: mars-order-producer-group
send-message-timeout: 3000
retry-times-when-send-failed: 2
retry-times-when-send-async-failed: 2
# 某些网络环境需要关闭 VIP 通道
vip-channel-enabled: false
# 消费者配置
consumer:
# 订单延迟主题
order-delay-topic: ORDER_DELAY_TOPIC
# 订单延迟消费者组
order-delay-group: mars-order-delay-consumer-group
三、实战场景:订单超时自动取消
3.1 业务场景说明
需求:用户下单后 30 分钟未付款,系统自动取消订单
技术方案:
- 用户下单时,发送一条延迟消息到 RocketMQ(延迟 30 分钟)
- 消费者监听延迟消息,30 分钟后收到消息
- 检查订单状态,如果仍未付款则取消订单
3.2 消息实体类
package com.mars.biz.order.entity;
import lombok.Data;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.LocalDateTime;
/**
* 订单消息体
*/
@Data
public class OrderMessage implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 订单 ID
*/
private Long orderId;
/**
* 订单编号
*/
private String orderNo;
/**
* 用户 ID
*/
private Long userId;
/**
* 订单金额
*/
private BigDecimal amount;
/**
* 下单时间
*/
private LocalDateTime createTime;
}
3.3 订单服务实现
package com.mars.biz.order.service.impl;
import com.mars.biz.order.entity.Order;
import com.mars.biz.order.entity.OrderMessage;
import com.mars.biz.order.mapper.OrderMapper;
import com.mars.biz.order.service.OrderService;
import com.mars.rocketmq.service.RocketMQService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;
import java.time.LocalDateTime;
/**
* 订单服务实现
*/
@Slf4j
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private RocketMQService rocketMQService;
/**
* 创建订单并发送延迟消息
*
* @param userId 用户 ID
* @param amount 订单金额
* @return 订单 ID
*/
@Override
@Transactional(rollbackFor = Exception.class)
public Long createOrder(Long userId, BigDecimal amount) {
// 1. 创建订单
Order order = new Order();
order.setOrderNo("ORD" + System.currentTimeMillis());
order.setUserId(userId);
order.setAmount(amount);
order.setStatus(0); // 0-待付款
order.setCreateTime(LocalDateTime.now());
orderMapper.insert(order);
log.info("订单创建成功,orderId: {}, orderNo: {}", order.getId(), order.getOrderNo());
// 2. 构建订单消息
OrderMessage message = new OrderMessage();
message.setOrderId(order.getId());
message.setOrderNo(order.getOrderNo());
message.setUserId(userId);
message.setAmount(amount);
message.setCreateTime(order.getCreateTime());
// 3. 发送延迟消息(延迟级别 5 = 10 分钟,实际根据需求调整)
// RocketMQ 延迟级别:1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h
boolean success = rocketMQService.sendDelayMessage(
"ORDER_DELAY_TOPIC",
JSON.toJSONString(message),
5 // 测试用 10 分钟,生产环境根据需要设置
);
if (success) {
log.info("订单延迟消息发送成功,orderId: {}", order.getId());
} else {
log.error("订单延迟消息发送失败,orderId: {}", order.getId());
// 这里可以记录到数据库,后续补偿
}
return order.getId();
}
}
3.4 延迟消息消费者
package com.mars.biz.order.consumer;
import com.alibaba.fastjson.JSON;
import com.mars.biz.order.entity.Order;
import com.mars.biz.order.entity.OrderMessage;
import com.mars.biz.order.mapper.OrderMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
/**
* 订单延迟消息消费者
*/
@Slf4j
@Service
@RocketMQMessageListener(
topic = "ORDER_DELAY_TOPIC",
consumerGroup = "mars-order-delay-consumer-group",
consumeMode = ConsumeMode.ORDERLY, // 顺序消费
maxReconsumeTimes = 3 // 最大重试次数
)
public class OrderDelayConsumer implements RocketMQListener<String> {
@Autowired
private OrderMapper orderMapper;
@Override
@Transactional(rollbackFor = Exception.class)
public void onMessage(String messageBody) {
try {
log.info("收到订单延迟消息:{}", messageBody);
// 1. 解析消息
OrderMessage orderMessage = JSON.parseObject(messageBody, OrderMessage.class);
Long orderId = orderMessage.getOrderId();
// 2. 查询订单
Order order = orderMapper.selectById(orderId);
if (order == null) {
log.warn("订单不存在,orderId: {}", orderId);
return;
}
// 3. 检查订单状态
if (order.getStatus() != 0) { // 不是待付款状态
log.info("订单已处理,无需取消,orderId: {}, status: {}", orderId, order.getStatus());
return;
}
// 4. 取消订单
order.setStatus(9); // 9-已取消
order.setCancelTime(LocalDateTime.now());
order.setRemark("超时未付款,系统自动取消");
orderMapper.updateById(order);
log.info("订单已自动取消,orderId: {}, orderNo: {}", orderId, order.getOrderNo());
// 5. 执行其他取消逻辑
// - 释放库存
// - 退还优惠券
// - 通知用户
} catch (Exception e) {
log.error("处理订单延迟消息异常,message: {}", messageBody, e);
throw e; // 抛出异常触发重试
}
}
}
3.5 手动取消订单(用户主动取消)
/**
* 用户取消订单
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void cancelOrder(Long orderId, Long userId) {
Order order = orderMapper.selectById(orderId);
if (order == null) {
throw new RuntimeException("订单不存在");
}
if (!order.getUserId().equals(userId)) {
throw new RuntimeException("无权操作该订单");
}
if (order.getStatus() != 0) {
throw new RuntimeException("订单状态不允许取消");
}
// 取消订单
order.setStatus(9);
order.setCancelTime(LocalDateTime.now());
order.setRemark("用户主动取消");
orderMapper.updateById(order);
log.info("订单已取消,orderId: {}", orderId);
// 注意:不需要发送延迟消息了,因为订单已经取消
// 消费者收到延迟消息时会检查订单状态,发现已取消就不会重复处理
}
四、高级用法
4.1 事务消息(保证最终一致性)
package com.mars.biz.order.service.impl;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Service
public class OrderTransactionService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 使用事务消息保证订单创建和消息发送的最终一致性
*/
@Transactional(rollbackFor = Exception.class)
public Long createOrderWithTransaction(Long userId, BigDecimal amount) {
// 1. 创建订单(本地事务)
Order order = createOrderInternal(userId, amount);
// 2. 构建事务消息
Message<OrderMessage> message = MessageBuilder.withPayload(buildOrderMessage(order))
.setHeader("orderId", order.getId())
.build();
// 3. 发送事务消息
SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(
"order-tx-group",
"ORDER_DELAY_TOPIC",
message,
null
);
log.info("事务消息发送结果:{}", sendResult.getSendStatus());
return order.getId();
}
/**
* 事务回查接口(需要实现 TransactionListener)
* 当 RocketMQ 未收到事务确认时,会主动回查本地事务状态
*/
@Component
public class OrderTransactionListener implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务后回调
// 返回 COMMIT_MESSAGE:提交消息
// 返回 ROLLBACK_MESSAGE:回滚消息
// 返回 UNKNOW:等待事务回查
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 事务回查逻辑
// 查询订单是否存在,存在则提交
return LocalTransactionState.COMMIT_MESSAGE;
}
}
}
4.2 批量发送消息
/**
* 批量发送订单消息
*/
public void batchSendOrderMessages(List<OrderMessage> messages) {
List<Message<OrderMessage>> messageList = messages.stream()
.map(m -> MessageBuilder.withPayload(m).build())
.collect(Collectors.toList());
// 批量发送(总大小不超过 1MB)
rocketMQTemplate.syncSend("BATCH_ORDER_TOPIC", messageList);
}
4.3 消息过滤(Tag 过滤)
// 发送带 Tag 的消息
Message<OrderMessage> message = MessageBuilder.withPayload(orderMessage)
.setHeader(MessageConst.PROPERTY_TAGS, "CREATE") // Tag
.build();
rocketMQTemplate.syncSend("ORDER_TOPIC:CREATE", message);
// 消费者过滤 Tag
@RocketMQMessageListener(
topic = "ORDER_TOPIC",
consumerGroup = "order-consumer-group",
selectorExpression = "CREATE" // 只消费 CREATE Tag 的消息
)
五、生产环境最佳实践
5.1 消息可靠性保证
- 同步发送 + 本地事务表
// 创建订单表
// 创建消息记录表(status: 0-待发送,1-已发送,2-发送失败)
@Transactional
public void createOrder(Order order) {
// 1. 插入订单
orderMapper.insert(order);
// 2. 插入消息记录
MessageRecord record = new MessageRecord();
record.setMessageId(UUID.randomUUID().toString());
record.setTopic("ORDER_DELAY_TOPIC");
record.setBody(JSON.toJSONString(order));
record.setStatus(0);
messageRecordMapper.insert(record);
}
// 定时任务扫描未发送的消息,重新发送
@Scheduled(fixedRate = 5000)
public void scanAndResendMessages() {
List<MessageRecord> records = messageRecordMapper.selectByStatus(0);
for (MessageRecord record : records) {
boolean success = rocketMQService.sendSyncMessage(
record.getTopic(),
record.getBody()
);
if (success) {
record.setStatus(1);
}
messageRecordMapper.updateById(record);
}
}
- 消费者幂等性
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Override
public void onMessage(String messageBody) {
OrderMessage orderMessage = JSON.parseObject(messageBody, OrderMessage.class);
// 1. 检查是否已处理(Redis 去重)
String key = "order:message:" + orderMessage.getOrderId();
Boolean processed = redisTemplate.opsForValue().setIfAbsent(
key,
"1",
Duration.ofDays(7)
);
if (Boolean.FALSE.equals(processed)) {
log.info("消息已处理,跳过,orderId: {}", orderMessage.getOrderId());
return;
}
// 2. 处理业务逻辑
// ...
}
5.2 监控和告警
@Configuration
public class RocketMQMonitorConfig {
@Bean
public RocketMQListener<String> monitorConsumer() {
return message -> {
// 监控消息消费情况
// 发送到监控系统或日志平台
};
}
}
5.3 延迟级别选择
| 级别 | 延迟时间 | 适用场景 |
|---|---|---|
| 1 | 1s | 即时通知 |
| 5 | 10m | 订单超时(测试) |
| 14 | 10m | 订单超时(生产) |
| 16 | 30m | 订单超时(标准) |
| 17 | 1h | 长时间任务 |
六、常见问题排查
Q1: 消息发送失败?
# 检查 NameServer 是否启动
telnet 127.0.0.1 9876
# 检查 Broker 是否启动
telnet 127.0.0.1 10911
# 查看生产者日志
logging.level.org.apache.rocketmq=DEBUG
Q2: 消费者不消费消息?
- 检查 Topic 和 ConsumerGroup 是否匹配
- 检查消费者是否启动成功
- 查看 RocketMQ Dashboard 消费进度
Q3: 消息重复消费?
- 实现消费者幂等性(Redis 去重、数据库唯一键)
- 使用事务保证
Q4: 延迟时间不准确?
- RocketMQ 的延迟级别是固定的,不能自定义精确时间
- 如需精确延迟,使用定时任务 + 数据库轮询
七、完整代码结构
mars-biz/
└── src/main/java/com/mars/biz/order/
├── consumer/
│ └── OrderDelayConsumer.java # 延迟消息消费者
├── entity/
│ ├── Order.java # 订单实体
│ └── OrderMessage.java # 订单消息体
├── mapper/
│ └── OrderMapper.java # 订单 Mapper
└── service/
├── OrderService.java # 订单服务接口
└── impl/
└── OrderServiceImpl.java # 订单服务实现
八、总结
使用 RocketMQ 实现订单超时取消的优势:
✅ 解耦:订单系统和超时逻辑分离
✅ 可靠:消息持久化,不会丢失
✅ 削峰:高峰期消息堆积,低峰期慢慢消费
✅ 扩展:可以轻松添加其他消费者(如通知用户)
相比定时任务的劣势:
❌ 架构复杂度增加
❌ 需要维护 RocketMQ 集群
❌ 延迟时间不精确(固定级别)
建议:
- 小项目:使用定时任务 + 数据库轮询
- 中大型项目:使用 RocketMQ 延迟消息