RocketMQ 快速开始 - 5 分钟上手
一、什么是 RocketMQ?
RocketMQ 是阿里巴巴开源的国产分布式消息中间件,具有高性能、高可靠、高实时的特点。
核心概念:
- Producer:消息生产者(发送消息)
- Consumer:消息消费者(接收消息)
- Topic:消息主题(消息的分类)
- Message:消息载体
二、快速集成步骤
步骤 1:添加依赖
在你的业务模块 pom.xml 中添加:
<dependency>
<groupId>com.mars</groupId>
<artifactId>mars-rocketmq</artifactId>
<version>1.0.0</version>
</dependency>
步骤 2:配置 RocketMQ
在 application.yml 中配置:
rocketmq:
name-server: 127.0.0.1:9876 # NameServer 地址
producer:
group: mars-producer-group # 生产者组名
vip-channel-enabled: false # 关闭 VIP 通道(避免某些网络问题)
步骤 3:发送消息
@Autowired
private RocketMQService rocketMQService;
// 发送一条消息
rocketMQService.sendSyncMessage("MY_TOPIC", "Hello RocketMQ!");
步骤 4:接收消息
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(
topic = "MY_TOPIC",
consumerGroup = "my-consumer-group"
)
public class MyConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("收到消息:" + message);
}
}
三、实战案例:订单超时自动取消
场景说明
用户下单后 30 分钟未付款,系统自动取消订单。
完整代码
1. 发送延迟消息(下单时)
@Service
public class OrderService {
@Autowired
private RocketMQService rocketMQService;
/**
* 创建订单
*/
@Transactional
public Long createOrder(Long userId, BigDecimal amount) {
// 1. 创建订单
Order order = new Order();
order.setOrderNo("ORD" + System.currentTimeMillis());
order.setUserId(userId);
order.setAmount(amount);
order.setStatus(0); // 待付款
orderMapper.insert(order);
// 2. 发送延迟消息(延迟级别 16 = 30 分钟)
OrderMessage msg = new OrderMessage();
msg.setOrderId(order.getId());
msg.setOrderNo(order.getOrderNo());
rocketMQService.sendDelayMessage(
"ORDER_DELAY_TOPIC",
JSON.toJSONString(msg),
16 // 延迟 30 分钟
);
return order.getId();
}
}
2. 消费延迟消息(30 分钟后)
@Service
@RocketMQMessageListener(
topic = "ORDER_DELAY_TOPIC",
consumerGroup = "order-delay-consumer-group",
consumeMode = ConsumeMode.ORDERLY // 顺序消费
)
public class OrderDelayConsumer implements RocketMQListener<String> {
@Autowired
private OrderMapper orderMapper;
@Override
@Transactional
public void onMessage(String messageBody) {
OrderMessage msg = JSON.parseObject(messageBody, OrderMessage.class);
// 查询订单
Order order = orderMapper.selectById(msg.getOrderId());
// 如果订单仍是待付款状态,则取消
if (order != null && order.getStatus() == 0) {
order.setStatus(9); // 已取消
orderMapper.updateById(order);
System.out.println("订单已自动取消:" + order.getOrderNo());
}
}
}
四、常用延迟级别
| 级别 | 延迟时间 | 使用场景 |
|---|---|---|
| 1 | 1 秒 | 即时通知 |
| 5 | 10 分钟 | 短时间超时 |
| 14 | 10 分钟 | 支付超时(测试) |
| 16 | 30 分钟 | 支付超时(标准) |
| 17 | 1 小时 | 长时间任务 |
完整列表:1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h
五、本地开发环境搭建
Windows 快速启动
下载 RocketMQ
下载地址:https://archive.apache.org/dist/rocketmq/5.0.0/rocketmq-all-5.0.0-bin-release.zip启动 NameServer
cd rocketmq-all-5.0.0-bin-release bin\runserver.cmd启动 Broker(新窗口)
cd rocketmq-all-5.0.0-bin-release bin\runbroker.cmd访问控制台(可选)
docker run -d --name rocketmq-console \ -p 8080:8080 \ -e "JAVA_OPTS=-Drocketmq.namesrv.addr=host.docker.internal:9876" \ apache/rocketmq-dashboard 访问:http://localhost:8080
六、常见问题
Q1: 消息发送失败?
解决:
- 检查 NameServer 是否启动:
telnet 127.0.0.1 9876 - 检查配置是否正确
- 设置
vip-channel-enabled: false
Q2: 消费者收不到消息?
解决:
- 检查 Topic 是否匹配
- 检查 ConsumerGroup 是否唯一
- 查看 RocketMQ Dashboard 消费进度
Q3: 如何保证消息不重复消费?
方案:实现幂等性
// Redis 去重
String key = "order:" + orderId;
Boolean processed = redisTemplate.opsForValue().setIfAbsent(
key, "1", Duration.ofDays(7)
);
if (Boolean.FALSE.equals(processed)) {
return; // 已处理过
}
// 处理业务...
七、最佳实践
✅ 消息体尽量小(< 1MB)
✅ 重要消息用同步发送
✅ 消费者实现幂等性
✅ 生产环境配置多个 NameServer
✅ 使用日志记录消息轨迹
❌ 不要频繁创建销毁 Producer
❌ 不要在消费者中处理耗时操作
❌ 不要忽略异常处理
八、下一步学习
有问题? 查看 ROCKETMQ_PRACTICE.md 获取更多详细信息!