在spring-boot中使用rocketmq大致需要4步,如下:

rocketmq使用流程
一.在项目中加入rocketmq-spring-boot-starter的依赖包
项目源码地址:
https://github.com/apache/rocketmq-spring
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.4</version>
</dependency>
二.在项目配置中加入RocketMQ的配置
rocketmq的配置订分为3块:

rocketmq配置
1.rocketmq server的配置
- rocketmq.name-server: rocketMQ的nameServer服务器地址(多个地址用;隔开)
- access-channel : RocketMQ服务的类型,有2个可选值: LOCAL: 本地(自建的RocketMQ服务);CLOUD: 使用云广商的RocketMQ服务
2.rocketmq producer相关的配置
非必须的配置
- group : 生产者组的名称
- send-message-timeout : 消息发送的超时时间(毫秒)
- compress-message-body-threshold : 对消息体进行压缩的阀值(即当消息内容达到多大时启用压缩)
- retry-times-when-send-failed : 当消息发送失败时重试次数
- retry-times-when-send-async-failed : 异步消息发送失败重试次数
- retry-next-server : 当发送失败时是否尝试换一个消息服务器进行重试
- max-message-size : 消息内容的最大大小(B)
- access-key : 云广商RocketMQ的access-key(类似于用户名)
- secret-key : 云广商RocketMQ的密码
- enable-msg-trace : 是否启用消息追踪
- customized-trace-topic :自定义消息追踪的topic
3.rocketmq consumer相关的配置
非必须的配置
- group : 消费者组的名称
- topic : 消费的topic
- message-model : 消息类型(集群消息CLUSTERING或广播消息BROADCAST)
- selector-type : 消息子分类类型(TAG或SQL92)
- selector-expression : 消息子分类表达式(当为TAG时即为TAG的名称;SQL92表明使用SQL表达式)
- access-key : 云广商RocketMQ的access-key(类似于用户名)
- secret-key : 云广商RocketMQ的密码
- pull-batch-size : 批量拉撤销息的数量
- enable-msg-trace : 是否启用消息追踪
- customized-trace-topic :自定义消息追踪的topic
- listeners : 消息消费监听容器配置(最外层的key为group的名称,里层的key为topic名称,值为是否监听)
三.消息生产者使用
消息生产通过RocketMQTemplate来实现

rocketmq生产者
1.设置属性
在发送消息前可以设置一些属性:
- 生产者的配置
- 类似于第二步中的生产者相关的配置
- 发送相关配置
- setAsyncSenderExecutor : 设置异步线程池
- setMessageQueueSelector : 设置顺序消息的选择器算法
- setMessageConverter : 设置消息格式转换器
2.发送消息
- send : 异步发送消息
- convertAndSend : 异步发送消息
- syncSend : 同步发送消息
String tag = "tag_order_paid";
OrderPaidEvent event = new OrderPaidEvent();
event.setOrderId(request.getOrderId());
//普通消息
rocketMQTemplate.convertAndSend("topic_order_normal:"+tag, event);
(1)发送延时消息:
rocketMQTemplate.syncSend("topic:tag", MessageBuilder.withPayload(orderId).build(), 10, 1);
- 第一个参数发送的topic和tag组成的目标
- 第二个参数为消息内容:需要通过MessageBuilder的方法来组装
- 第三个参数为发送的超时时间
- 第四个参数为延时的等级
(2)发送顺序消息
//顺序消息(指定作为顺序消息的分区的键)
Message<OrderPaidEvent> msg = MessageBuilder.withPayload(event).build();
rocketMQTemplate.syncSendOrderly("topic_order_orderly:"+tag, msg, String.valueOf(event.getOrderId()));
(3)发送定时消息
//定时消息(以弥补延时消息不够灵活的问题)
long deliverTimeStamp = System.currentTimeMillis() + 24 * 60 * 60 * 1000;
org.apache.rocketmq.common.message.Message message = new
org.apache.rocketmq.common.message.Message("topic_order_delay", tag, JSONObject.toJSONString(event).getBytes());
message.setDeliverTimeMs(deliverTimeStamp);
rocketMQTemplate.getProducer().send(message);
(4)发送事务消息
//事务消息
Message<OrderPaidEvent> transactionMsg = MessageBuilder.withPayload(event).build();
rocketMQTemplate.sendMessageInTransaction("topic_order_transaction:"+tag, transactionMsg, null);
四.消息消费者使用

rocketmq消费者使用
1.使用步骤
(1).在监听类上使用消息监听注解RocketMQMessageListener
其主要属性如下:
- consumerGroup : 消费组名称
- topic : 消费的topic
- selectorType : 子分类类型(默认为TAG)
- selectorExpression : 子分类表达式(默认为*)
- messageModel : 消息模式(默认为集群消息:MessageModel.CLUSTERING;也可以是广播消息:MessageModel.BROADCASE)
- consumeThreadMax : 最大消费线程数
- maxReconsumeTimes : 最大消费次数
- consumeTimeout : 阻塞消费线程消费的最大时间(分钟)
- replyTimeout : 发送回复消息的超时时间
- accessKey : 云广商RocketMQ的access-key(类似于用户名),默认读取配置rocketmq.consumer.access-key
- secretKey : 云广商RocketMQ的密码,默认读取配置rocketmq.consumer.secret-key
- nameServer : RocketMQ注册中心地址,默认读取rocketmq.name-server
- accessChannel : RocketMQ服务的类型,有2个可选值: LOCAL: 本地(自建的RocketMQ服务);CLOUD: 使用云广商的RocketMQ服务
(2).实现RocketMQListener或RocketMQReplyListener
@Component
@RocketMQMessageListener(
topic = "topic_order_normal",
selectorExpression = "tag_order_paid",
consumerGroup = "GID_ORDER_PAID_COUPON_CONSUMER")
@Slf4j
public class OrderPaidCouponConsumer implements RocketMQListener<OrderPaidEvent> {
@Override
public void onMessage(OrderPaidEvent message) {
log.info("收到订单支付消息,给用户发送代金券, message={}", message);
}
}
注:这里通过如果不是异常,则消费都会自动确认(ACK);如果抛出异常则会再次尝试消费
© 版权声明
文章版权归作者所有,未经允许请勿转载。
相关文章
暂无评论...