Mars RocketMQ 使用指南
简介
mars-rocketmq 是基于 Apache RocketMQ 的国产分布式消息中间件基础设施模块,提供消息发送、消费等功能。
快速开始
1. 添加依赖
在你的模块 pom.xml 中添加:
<dependency>
<groupId>com.mars</groupId>
<artifactId>mars-rocketmq</artifactId>
<version>1.0.0</version>
</dependency>
2. 配置 RocketMQ
在 application.yml 中添加配置:
rocketmq:
name-server: 127.0.0.1:9876 # NameServer 地址
producer:
group: mars-producer-group # 生产者组名称
send-message-timeout: 3000
retry-times-when-send-failed: 2
retry-times-when-send-async-failed: 2
vip-channel-enabled: false # 某些环境需要关闭 VIP 通道
3. 使用示例
3.1 发送消息
@Autowired
private RocketMQService rocketMQService;
// 发送同步消息
boolean result = rocketMQService.sendSyncMessage("my-topic", "Hello RocketMQ");
// 发送异步消息
rocketMQService.sendAsyncMessage("my-topic", "Async Message");
// 发送顺序消息
rocketMQService.sendOrderlyMessage("order-topic", "Order created", "order-id-123");
// 发送延迟消息(delayLevel: 1-18)
rocketMQService.sendDelayMessage("delay-topic", "Delayed message", 3);
3.2 消费消息
创建消费者类:
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(
topic = "my-topic",
consumerGroup = "my-consumer-group"
)
public class MyConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("收到消息:" + message);
// 处理业务逻辑
}
}
高级用法
1. 发送复杂对象
// 发送 Java 对象
MyObject obj = new MyObject();
rocketMQTemplate.syncSend("object-topic", obj);
// 发送 Message 对象(可设置 headers 等属性)
Message<String> msg = MessageBuilder
.withPayload("message body")
.setHeader("key", "value")
.build();
rocketMQTemplate.syncSend("header-topic", msg);
2. 事务消息
@Service
public class TransactionProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendTransactionMessage() {
rocketMQTemplate.sendMessageInTransaction(
"tx-producer-group",
"tx-topic",
MessageBuilder.withPayload("transaction message").build(),
null
);
}
}
3. 批量发送
List<String> messages = Arrays.asList("msg1", "msg2", "msg3");
rocketMQTemplate.syncSend("batch-topic", messages);
RocketMQ 延迟级别
RocketMQ 支持 18 个延迟级别:
| 级别 | 延迟时间 | 级别 | 延迟时间 |
|---|---|---|---|
| 1 | 1s | 10 | 6m |
| 2 | 5s | 11 | 7m |
| 3 | 10s | 12 | 8m |
| 4 | 30s | 13 | 9m |
| 5 | 1m | 14 | 10m |
| 6 | 2m | 15 | 20m |
| 7 | 3m | 16 | 30m |
| 8 | 4m | 17 | 1h |
| 9 | 5m | 18 | 2h |
注意事项
- NameServer 高可用:生产环境建议配置多个 NameServer
- 消息可靠性:重要业务建议使用同步消息 + 本地事务表
- 消息幂等性:消费者需要实现幂等性处理
- Topic 和 Group:提前在 RocketMQ 控制台创建或通过代码自动创建
- VIP 通道:某些网络环境需要关闭 VIP 通道(vip-channel-enabled: false)
参考资料
常见问题
Q: 消息发送失败?
A: 检查 NameServer 地址是否正确,网络是否连通,生产者组是否已创建。
Q: 消费者不消费消息?
A: 检查消费者组是否正确,Topic 是否存在,消费者是否启动成功。
Q: 如何查看消息投递情况?
A: 使用 RocketMQ 控制台(rocketmq-dashboard)查看消息轨迹。