Administrator
发布于 2026-03-19 / 1 阅读
0
0

RocketMQ 实战指南 - 订单超时自动取消

RocketMQ 实战指南 - 订单超时自动取消

一、环境准备

1.1 RocketMQ 安装

# 下载 RocketMQ
wget https://archive.apache.org/dist/rocketmq/5.0.0/rocketmq-all-5.0.0-bin-release.zip

# 解压
unzip rocketmq-all-5.0.0-bin-release.zip
cd rocketmq-all-5.0.0-bin-release

# 启动 NameServer(Windows 使用 runserver.cmd)
nohup sh bin/mqnamesrv &

# 启动 Broker(Windows 使用 runbroker.cmd)
nohup sh bin/mqbroker -n localhost:9876 &

# 启动控制台(可选)
docker run -d --name rocketmq-console \
  -p 8080:8080 \
  -e "JAVA_OPTS=-Drocketmq.namesrv.addr=localhost:9876" \
  apache/rocketmq-dashboard

1.2 Maven 依赖

在业务模块的 pom.xml 中添加:

<dependency>
    <groupId>com.mars</groupId>
    <artifactId>mars-rocketmq</artifactId>
    <version>1.0.0</version>
</dependency>

二、配置文件

2.1 application.yml 配置

rocketmq:
  # NameServer 地址(生产环境配置多个,用分号分隔)
  name-server: 127.0.0.1:9876
  
  # 生产者配置
  producer:
    group: mars-order-producer-group
    send-message-timeout: 3000
    retry-times-when-send-failed: 2
    retry-times-when-send-async-failed: 2
    # 某些网络环境需要关闭 VIP 通道
    vip-channel-enabled: false
  
  # 消费者配置
  consumer:
    # 订单延迟主题
    order-delay-topic: ORDER_DELAY_TOPIC
    # 订单延迟消费者组
    order-delay-group: mars-order-delay-consumer-group

三、实战场景:订单超时自动取消

3.1 业务场景说明

需求:用户下单后 30 分钟未付款,系统自动取消订单

技术方案

  1. 用户下单时,发送一条延迟消息到 RocketMQ(延迟 30 分钟)
  2. 消费者监听延迟消息,30 分钟后收到消息
  3. 检查订单状态,如果仍未付款则取消订单

3.2 消息实体类

package com.mars.biz.order.entity;

import lombok.Data;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.LocalDateTime;

/**
 * 订单消息体
 */
@Data
public class OrderMessage implements Serializable {
    
    private static final long serialVersionUID = 1L;
    
    /**
     * 订单 ID
     */
    private Long orderId;
    
    /**
     * 订单编号
     */
    private String orderNo;
    
    /**
     * 用户 ID
     */
    private Long userId;
    
    /**
     * 订单金额
     */
    private BigDecimal amount;
    
    /**
     * 下单时间
     */
    private LocalDateTime createTime;
}

3.3 订单服务实现

package com.mars.biz.order.service.impl;

import com.mars.biz.order.entity.Order;
import com.mars.biz.order.entity.OrderMessage;
import com.mars.biz.order.mapper.OrderMapper;
import com.mars.biz.order.service.OrderService;
import com.mars.rocketmq.service.RocketMQService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.math.BigDecimal;
import java.time.LocalDateTime;

/**
 * 订单服务实现
 */
@Slf4j
@Service
public class OrderServiceImpl implements OrderService {
    
    @Autowired
    private OrderMapper orderMapper;
    
    @Autowired
    private RocketMQService rocketMQService;
    
    /**
     * 创建订单并发送延迟消息
     * 
     * @param userId 用户 ID
     * @param amount 订单金额
     * @return 订单 ID
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public Long createOrder(Long userId, BigDecimal amount) {
        // 1. 创建订单
        Order order = new Order();
        order.setOrderNo("ORD" + System.currentTimeMillis());
        order.setUserId(userId);
        order.setAmount(amount);
        order.setStatus(0); // 0-待付款
        order.setCreateTime(LocalDateTime.now());
        
        orderMapper.insert(order);
        log.info("订单创建成功,orderId: {}, orderNo: {}", order.getId(), order.getOrderNo());
        
        // 2. 构建订单消息
        OrderMessage message = new OrderMessage();
        message.setOrderId(order.getId());
        message.setOrderNo(order.getOrderNo());
        message.setUserId(userId);
        message.setAmount(amount);
        message.setCreateTime(order.getCreateTime());
        
        // 3. 发送延迟消息(延迟级别 5 = 10 分钟,实际根据需求调整)
        // RocketMQ 延迟级别:1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h
        boolean success = rocketMQService.sendDelayMessage(
            "ORDER_DELAY_TOPIC",
            JSON.toJSONString(message),
            5  // 测试用 10 分钟,生产环境根据需要设置
        );
        
        if (success) {
            log.info("订单延迟消息发送成功,orderId: {}", order.getId());
        } else {
            log.error("订单延迟消息发送失败,orderId: {}", order.getId());
            // 这里可以记录到数据库,后续补偿
        }
        
        return order.getId();
    }
}

3.4 延迟消息消费者

package com.mars.biz.order.consumer;

import com.alibaba.fastjson.JSON;
import com.mars.biz.order.entity.Order;
import com.mars.biz.order.entity.OrderMessage;
import com.mars.biz.order.mapper.OrderMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.time.LocalDateTime;

/**
 * 订单延迟消息消费者
 */
@Slf4j
@Service
@RocketMQMessageListener(
    topic = "ORDER_DELAY_TOPIC",
    consumerGroup = "mars-order-delay-consumer-group",
    consumeMode = ConsumeMode.ORDERLY, // 顺序消费
    maxReconsumeTimes = 3 // 最大重试次数
)
public class OrderDelayConsumer implements RocketMQListener<String> {
    
    @Autowired
    private OrderMapper orderMapper;
    
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void onMessage(String messageBody) {
        try {
            log.info("收到订单延迟消息:{}", messageBody);
            
            // 1. 解析消息
            OrderMessage orderMessage = JSON.parseObject(messageBody, OrderMessage.class);
            Long orderId = orderMessage.getOrderId();
            
            // 2. 查询订单
            Order order = orderMapper.selectById(orderId);
            if (order == null) {
                log.warn("订单不存在,orderId: {}", orderId);
                return;
            }
            
            // 3. 检查订单状态
            if (order.getStatus() != 0) { // 不是待付款状态
                log.info("订单已处理,无需取消,orderId: {}, status: {}", orderId, order.getStatus());
                return;
            }
            
            // 4. 取消订单
            order.setStatus(9); // 9-已取消
            order.setCancelTime(LocalDateTime.now());
            order.setRemark("超时未付款,系统自动取消");
            
            orderMapper.updateById(order);
            log.info("订单已自动取消,orderId: {}, orderNo: {}", orderId, order.getOrderNo());
            
            // 5. 执行其他取消逻辑
            // - 释放库存
            // - 退还优惠券
            // - 通知用户
            
        } catch (Exception e) {
            log.error("处理订单延迟消息异常,message: {}", messageBody, e);
            throw e; // 抛出异常触发重试
        }
    }
}

3.5 手动取消订单(用户主动取消)

/**
 * 用户取消订单
 */
@Override
@Transactional(rollbackFor = Exception.class)
public void cancelOrder(Long orderId, Long userId) {
    Order order = orderMapper.selectById(orderId);
    
    if (order == null) {
        throw new RuntimeException("订单不存在");
    }
    
    if (!order.getUserId().equals(userId)) {
        throw new RuntimeException("无权操作该订单");
    }
    
    if (order.getStatus() != 0) {
        throw new RuntimeException("订单状态不允许取消");
    }
    
    // 取消订单
    order.setStatus(9);
    order.setCancelTime(LocalDateTime.now());
    order.setRemark("用户主动取消");
    orderMapper.updateById(order);
    
    log.info("订单已取消,orderId: {}", orderId);
    
    // 注意:不需要发送延迟消息了,因为订单已经取消
    // 消费者收到延迟消息时会检查订单状态,发现已取消就不会重复处理
}

四、高级用法

4.1 事务消息(保证最终一致性)

package com.mars.biz.order.service.impl;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Slf4j
@Service
public class OrderTransactionService {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 使用事务消息保证订单创建和消息发送的最终一致性
     */
    @Transactional(rollbackFor = Exception.class)
    public Long createOrderWithTransaction(Long userId, BigDecimal amount) {
        // 1. 创建订单(本地事务)
        Order order = createOrderInternal(userId, amount);
        
        // 2. 构建事务消息
        Message<OrderMessage> message = MessageBuilder.withPayload(buildOrderMessage(order))
            .setHeader("orderId", order.getId())
            .build();
        
        // 3. 发送事务消息
        SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(
            "order-tx-group",
            "ORDER_DELAY_TOPIC",
            message,
            null
        );
        
        log.info("事务消息发送结果:{}", sendResult.getSendStatus());
        
        return order.getId();
    }
    
    /**
     * 事务回查接口(需要实现 TransactionListener)
     * 当 RocketMQ 未收到事务确认时,会主动回查本地事务状态
     */
    @Component
    public class OrderTransactionListener implements TransactionListener {
        
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            // 执行本地事务后回调
            // 返回 COMMIT_MESSAGE:提交消息
            // 返回 ROLLBACK_MESSAGE:回滚消息
            // 返回 UNKNOW:等待事务回查
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            // 事务回查逻辑
            // 查询订单是否存在,存在则提交
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    }
}

4.2 批量发送消息

/**
 * 批量发送订单消息
 */
public void batchSendOrderMessages(List<OrderMessage> messages) {
    List<Message<OrderMessage>> messageList = messages.stream()
        .map(m -> MessageBuilder.withPayload(m).build())
        .collect(Collectors.toList());
    
    // 批量发送(总大小不超过 1MB)
    rocketMQTemplate.syncSend("BATCH_ORDER_TOPIC", messageList);
}

4.3 消息过滤(Tag 过滤)

// 发送带 Tag 的消息
Message<OrderMessage> message = MessageBuilder.withPayload(orderMessage)
    .setHeader(MessageConst.PROPERTY_TAGS, "CREATE") // Tag
    .build();
rocketMQTemplate.syncSend("ORDER_TOPIC:CREATE", message);

// 消费者过滤 Tag
@RocketMQMessageListener(
    topic = "ORDER_TOPIC",
    consumerGroup = "order-consumer-group",
    selectorExpression = "CREATE" // 只消费 CREATE Tag 的消息
)

五、生产环境最佳实践

5.1 消息可靠性保证

  1. 同步发送 + 本地事务表
// 创建订单表
// 创建消息记录表(status: 0-待发送,1-已发送,2-发送失败)
@Transactional
public void createOrder(Order order) {
    // 1. 插入订单
    orderMapper.insert(order);
    
    // 2. 插入消息记录
    MessageRecord record = new MessageRecord();
    record.setMessageId(UUID.randomUUID().toString());
    record.setTopic("ORDER_DELAY_TOPIC");
    record.setBody(JSON.toJSONString(order));
    record.setStatus(0);
    messageRecordMapper.insert(record);
}

// 定时任务扫描未发送的消息,重新发送
@Scheduled(fixedRate = 5000)
public void scanAndResendMessages() {
    List<MessageRecord> records = messageRecordMapper.selectByStatus(0);
    for (MessageRecord record : records) {
        boolean success = rocketMQService.sendSyncMessage(
            record.getTopic(), 
            record.getBody()
        );
        if (success) {
            record.setStatus(1);
        }
        messageRecordMapper.updateById(record);
    }
}
  1. 消费者幂等性
@Autowired
private RedisTemplate<String, String> redisTemplate;

@Override
public void onMessage(String messageBody) {
    OrderMessage orderMessage = JSON.parseObject(messageBody, OrderMessage.class);
    
    // 1. 检查是否已处理(Redis 去重)
    String key = "order:message:" + orderMessage.getOrderId();
    Boolean processed = redisTemplate.opsForValue().setIfAbsent(
        key, 
        "1", 
        Duration.ofDays(7)
    );
    
    if (Boolean.FALSE.equals(processed)) {
        log.info("消息已处理,跳过,orderId: {}", orderMessage.getOrderId());
        return;
    }
    
    // 2. 处理业务逻辑
    // ...
}

5.2 监控和告警

@Configuration
public class RocketMQMonitorConfig {
    
    @Bean
    public RocketMQListener<String> monitorConsumer() {
        return message -> {
            // 监控消息消费情况
            // 发送到监控系统或日志平台
        };
    }
}

5.3 延迟级别选择

级别 延迟时间 适用场景
1 1s 即时通知
5 10m 订单超时(测试)
14 10m 订单超时(生产)
16 30m 订单超时(标准)
17 1h 长时间任务

六、常见问题排查

Q1: 消息发送失败?

# 检查 NameServer 是否启动
telnet 127.0.0.1 9876

# 检查 Broker 是否启动
telnet 127.0.0.1 10911

# 查看生产者日志
logging.level.org.apache.rocketmq=DEBUG

Q2: 消费者不消费消息?

  • 检查 Topic 和 ConsumerGroup 是否匹配
  • 检查消费者是否启动成功
  • 查看 RocketMQ Dashboard 消费进度

Q3: 消息重复消费?

  • 实现消费者幂等性(Redis 去重、数据库唯一键)
  • 使用事务保证

Q4: 延迟时间不准确?

  • RocketMQ 的延迟级别是固定的,不能自定义精确时间
  • 如需精确延迟,使用定时任务 + 数据库轮询

七、完整代码结构

mars-biz/
└── src/main/java/com/mars/biz/order/
    ├── consumer/
    │   └── OrderDelayConsumer.java          # 延迟消息消费者
    ├── entity/
    │   ├── Order.java                       # 订单实体
    │   └── OrderMessage.java                # 订单消息体
    ├── mapper/
    │   └── OrderMapper.java                 # 订单 Mapper
    └── service/
        ├── OrderService.java                # 订单服务接口
        └── impl/
            └── OrderServiceImpl.java        # 订单服务实现

八、总结

使用 RocketMQ 实现订单超时取消的优势:

解耦:订单系统和超时逻辑分离
可靠:消息持久化,不会丢失
削峰:高峰期消息堆积,低峰期慢慢消费
扩展:可以轻松添加其他消费者(如通知用户)

相比定时任务的劣势:

❌ 架构复杂度增加
❌ 需要维护 RocketMQ 集群
❌ 延迟时间不精确(固定级别)

建议

  • 小项目:使用定时任务 + 数据库轮询
  • 中大型项目:使用 RocketMQ 延迟消息


评论