Administrator
发布于 2026-03-19 / 1 阅读
0
0

Mars RocketMQ 使用指南

Mars RocketMQ 使用指南

简介

mars-rocketmq 是基于 Apache RocketMQ 的国产分布式消息中间件基础设施模块,提供消息发送、消费等功能。

快速开始

1. 添加依赖

在你的模块 pom.xml 中添加:

<dependency>
    <groupId>com.mars</groupId>
    <artifactId>mars-rocketmq</artifactId>
    <version>1.0.0</version>
</dependency>

2. 配置 RocketMQ

application.yml 中添加配置:

rocketmq:
  name-server: 127.0.0.1:9876  # NameServer 地址
  producer:
    group: mars-producer-group  # 生产者组名称
    send-message-timeout: 3000
    retry-times-when-send-failed: 2
    retry-times-when-send-async-failed: 2
    vip-channel-enabled: false  # 某些环境需要关闭 VIP 通道

3. 使用示例

3.1 发送消息

@Autowired
private RocketMQService rocketMQService;

// 发送同步消息
boolean result = rocketMQService.sendSyncMessage("my-topic", "Hello RocketMQ");

// 发送异步消息
rocketMQService.sendAsyncMessage("my-topic", "Async Message");

// 发送顺序消息
rocketMQService.sendOrderlyMessage("order-topic", "Order created", "order-id-123");

// 发送延迟消息(delayLevel: 1-18)
rocketMQService.sendDelayMessage("delay-topic", "Delayed message", 3);

3.2 消费消息

创建消费者类:

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(
    topic = "my-topic",
    consumerGroup = "my-consumer-group"
)
public class MyConsumer implements RocketMQListener<String> {
    
    @Override
    public void onMessage(String message) {
        System.out.println("收到消息:" + message);
        // 处理业务逻辑
    }
}

高级用法

1. 发送复杂对象

// 发送 Java 对象
MyObject obj = new MyObject();
rocketMQTemplate.syncSend("object-topic", obj);

// 发送 Message 对象(可设置 headers 等属性)
Message<String> msg = MessageBuilder
    .withPayload("message body")
    .setHeader("key", "value")
    .build();
rocketMQTemplate.syncSend("header-topic", msg);

2. 事务消息

@Service
public class TransactionProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    public void sendTransactionMessage() {
        rocketMQTemplate.sendMessageInTransaction(
            "tx-producer-group",
            "tx-topic",
            MessageBuilder.withPayload("transaction message").build(),
            null
        );
    }
}

3. 批量发送

List<String> messages = Arrays.asList("msg1", "msg2", "msg3");
rocketMQTemplate.syncSend("batch-topic", messages);

RocketMQ 延迟级别

RocketMQ 支持 18 个延迟级别:

级别 延迟时间 级别 延迟时间
1 1s 10 6m
2 5s 11 7m
3 10s 12 8m
4 30s 13 9m
5 1m 14 10m
6 2m 15 20m
7 3m 16 30m
8 4m 17 1h
9 5m 18 2h

注意事项

  1. NameServer 高可用:生产环境建议配置多个 NameServer
  2. 消息可靠性:重要业务建议使用同步消息 + 本地事务表
  3. 消息幂等性:消费者需要实现幂等性处理
  4. Topic 和 Group:提前在 RocketMQ 控制台创建或通过代码自动创建
  5. VIP 通道:某些网络环境需要关闭 VIP 通道(vip-channel-enabled: false)

参考资料

常见问题

Q: 消息发送失败?

A: 检查 NameServer 地址是否正确,网络是否连通,生产者组是否已创建。

Q: 消费者不消费消息?

A: 检查消费者组是否正确,Topic 是否存在,消费者是否启动成功。

Q: 如何查看消息投递情况?

A: 使用 RocketMQ 控制台(rocketmq-dashboard)查看消息轨迹。


评论