Administrator
发布于 2026-03-19 / 1 阅读
0
0

RocketMQ 快速开始 - 5 分钟上手

RocketMQ 快速开始 - 5 分钟上手

一、什么是 RocketMQ?

RocketMQ 是阿里巴巴开源的国产分布式消息中间件,具有高性能、高可靠、高实时的特点。

核心概念

  • Producer:消息生产者(发送消息)
  • Consumer:消息消费者(接收消息)
  • Topic:消息主题(消息的分类)
  • Message:消息载体

二、快速集成步骤

步骤 1:添加依赖

在你的业务模块 pom.xml 中添加:

<dependency>
    <groupId>com.mars</groupId>
    <artifactId>mars-rocketmq</artifactId>
    <version>1.0.0</version>
</dependency>

步骤 2:配置 RocketMQ

application.yml 中配置:

rocketmq:
  name-server: 127.0.0.1:9876  # NameServer 地址
  producer:
    group: mars-producer-group  # 生产者组名
    vip-channel-enabled: false  # 关闭 VIP 通道(避免某些网络问题)

步骤 3:发送消息

@Autowired
private RocketMQService rocketMQService;

// 发送一条消息
rocketMQService.sendSyncMessage("MY_TOPIC", "Hello RocketMQ!");

步骤 4:接收消息

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(
    topic = "MY_TOPIC",
    consumerGroup = "my-consumer-group"
)
public class MyConsumer implements RocketMQListener<String> {
    
    @Override
    public void onMessage(String message) {
        System.out.println("收到消息:" + message);
    }
}

三、实战案例:订单超时自动取消

场景说明

用户下单后 30 分钟未付款,系统自动取消订单。

完整代码

1. 发送延迟消息(下单时)

@Service
public class OrderService {
    
    @Autowired
    private RocketMQService rocketMQService;
    
    /**
     * 创建订单
     */
    @Transactional
    public Long createOrder(Long userId, BigDecimal amount) {
        // 1. 创建订单
        Order order = new Order();
        order.setOrderNo("ORD" + System.currentTimeMillis());
        order.setUserId(userId);
        order.setAmount(amount);
        order.setStatus(0); // 待付款
        
        orderMapper.insert(order);
        
        // 2. 发送延迟消息(延迟级别 16 = 30 分钟)
        OrderMessage msg = new OrderMessage();
        msg.setOrderId(order.getId());
        msg.setOrderNo(order.getOrderNo());
        
        rocketMQService.sendDelayMessage(
            "ORDER_DELAY_TOPIC",
            JSON.toJSONString(msg),
            16  // 延迟 30 分钟
        );
        
        return order.getId();
    }
}

2. 消费延迟消息(30 分钟后)

@Service
@RocketMQMessageListener(
    topic = "ORDER_DELAY_TOPIC",
    consumerGroup = "order-delay-consumer-group",
    consumeMode = ConsumeMode.ORDERLY  // 顺序消费
)
public class OrderDelayConsumer implements RocketMQListener<String> {
    
    @Autowired
    private OrderMapper orderMapper;
    
    @Override
    @Transactional
    public void onMessage(String messageBody) {
        OrderMessage msg = JSON.parseObject(messageBody, OrderMessage.class);
        
        // 查询订单
        Order order = orderMapper.selectById(msg.getOrderId());
        
        // 如果订单仍是待付款状态,则取消
        if (order != null && order.getStatus() == 0) {
            order.setStatus(9); // 已取消
            orderMapper.updateById(order);
            
            System.out.println("订单已自动取消:" + order.getOrderNo());
        }
    }
}

四、常用延迟级别

级别 延迟时间 使用场景
1 1 秒 即时通知
5 10 分钟 短时间超时
14 10 分钟 支付超时(测试)
16 30 分钟 支付超时(标准)
17 1 小时 长时间任务

完整列表:1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h

五、本地开发环境搭建

Windows 快速启动

  1. 下载 RocketMQ

    下载地址:https://archive.apache.org/dist/rocketmq/5.0.0/rocketmq-all-5.0.0-bin-release.zip
    
  2. 启动 NameServer

    cd rocketmq-all-5.0.0-bin-release
    bin\runserver.cmd
    
  3. 启动 Broker(新窗口)

    cd rocketmq-all-5.0.0-bin-release
    bin\runbroker.cmd
    
  4. 访问控制台(可选)

    docker run -d --name rocketmq-console \
      -p 8080:8080 \
      -e "JAVA_OPTS=-Drocketmq.namesrv.addr=host.docker.internal:9876" \
      apache/rocketmq-dashboard
    
    访问:http://localhost:8080
    

六、常见问题

Q1: 消息发送失败?

解决

  • 检查 NameServer 是否启动:telnet 127.0.0.1 9876
  • 检查配置是否正确
  • 设置 vip-channel-enabled: false

Q2: 消费者收不到消息?

解决

  • 检查 Topic 是否匹配
  • 检查 ConsumerGroup 是否唯一
  • 查看 RocketMQ Dashboard 消费进度

Q3: 如何保证消息不重复消费?

方案:实现幂等性

// Redis 去重
String key = "order:" + orderId;
Boolean processed = redisTemplate.opsForValue().setIfAbsent(
    key, "1", Duration.ofDays(7)
);
if (Boolean.FALSE.equals(processed)) {
    return; // 已处理过
}
// 处理业务...

七、最佳实践

消息体尽量小(< 1MB)
重要消息用同步发送
消费者实现幂等性
生产环境配置多个 NameServer
使用日志记录消息轨迹

不要频繁创建销毁 Producer
不要在消费者中处理耗时操作
不要忽略异常处理

八、下一步学习


有问题? 查看 ROCKETMQ_PRACTICE.md 获取更多详细信息!


评论