大白话RocketMQ系列之当RocketMQ遇上Spring Boot

在spring-boot中使用rocketmq大致需要4步,如下:

大白话RocketMQ系列之当RocketMQ遇上Spring Boot

rocketmq使用流程

一.在项目中加入rocketmq-spring-boot-starter的依赖包

项目源码地址:
https://github.com/apache/rocketmq-spring

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.3.4</version>
</dependency>

二.在项目配置中加入RocketMQ的配置

rocketmq的配置订分为3块:

大白话RocketMQ系列之当RocketMQ遇上Spring Boot

rocketmq配置

1.rocketmq server的配置

  • rocketmq.name-server: rocketMQ的nameServer服务器地址(多个地址用;隔开)
  • access-channel : RocketMQ服务的类型,有2个可选值: LOCAL: 本地(自建的RocketMQ服务);CLOUD: 使用云广商的RocketMQ服务

2.rocketmq producer相关的配置

非必须的配置

  • group : 生产者组的名称
  • send-message-timeout : 消息发送的超时时间(毫秒)
  • compress-message-body-threshold : 对消息体进行压缩的阀值(即当消息内容达到多大时启用压缩)
  • retry-times-when-send-failed : 当消息发送失败时重试次数
  • retry-times-when-send-async-failed : 异步消息发送失败重试次数
  • retry-next-server : 当发送失败时是否尝试换一个消息服务器进行重试
  • max-message-size : 消息内容的最大大小(B)
  • access-key : 云广商RocketMQ的access-key(类似于用户名)
  • secret-key : 云广商RocketMQ的密码
  • enable-msg-trace : 是否启用消息追踪
  • customized-trace-topic :自定义消息追踪的topic

3.rocketmq consumer相关的配置

非必须的配置

  • group : 消费者组的名称
  • topic : 消费的topic
  • message-model : 消息类型(集群消息CLUSTERING或广播消息BROADCAST)
  • selector-type : 消息子分类类型(TAG或SQL92)
  • selector-expression : 消息子分类表达式(当为TAG时即为TAG的名称;SQL92表明使用SQL表达式)
  • access-key : 云广商RocketMQ的access-key(类似于用户名)
  • secret-key : 云广商RocketMQ的密码
  • pull-batch-size : 批量拉撤销息的数量
  • enable-msg-trace : 是否启用消息追踪
  • customized-trace-topic :自定义消息追踪的topic
  • listeners : 消息消费监听容器配置(最外层的key为group的名称,里层的key为topic名称,值为是否监听)

三.消息生产者使用

消息生产通过RocketMQTemplate来实现

大白话RocketMQ系列之当RocketMQ遇上Spring Boot

rocketmq生产者

1.设置属性

在发送消息前可以设置一些属性:

  • 生产者的配置
    • 类似于第二步中的生产者相关的配置
  • 发送相关配置
    • setAsyncSenderExecutor : 设置异步线程池
    • setMessageQueueSelector : 设置顺序消息的选择器算法
    • setMessageConverter : 设置消息格式转换器

2.发送消息

  • send : 异步发送消息
  • convertAndSend : 异步发送消息
  • syncSend : 同步发送消息
String tag = "tag_order_paid";
OrderPaidEvent event = new OrderPaidEvent();
event.setOrderId(request.getOrderId());
//普通消息
rocketMQTemplate.convertAndSend("topic_order_normal:"+tag, event);

(1)发送延时消息:

rocketMQTemplate.syncSend("topic:tag", MessageBuilder.withPayload(orderId).build(), 10, 1);
  • 第一个参数发送的topic和tag组成的目标
  • 第二个参数为消息内容:需要通过MessageBuilder的方法来组装
  • 第三个参数为发送的超时时间
  • 第四个参数为延时的等级

(2)发送顺序消息

//顺序消息(指定作为顺序消息的分区的键)
Message<OrderPaidEvent> msg = MessageBuilder.withPayload(event).build();
rocketMQTemplate.syncSendOrderly("topic_order_orderly:"+tag, msg, String.valueOf(event.getOrderId()));

(3)发送定时消息

//定时消息(以弥补延时消息不够灵活的问题)
long deliverTimeStamp = System.currentTimeMillis() + 24 * 60 * 60 * 1000;
org.apache.rocketmq.common.message.Message message = new
                org.apache.rocketmq.common.message.Message("topic_order_delay", tag, JSONObject.toJSONString(event).getBytes());
message.setDeliverTimeMs(deliverTimeStamp);
rocketMQTemplate.getProducer().send(message);

(4)发送事务消息

 //事务消息
        Message<OrderPaidEvent> transactionMsg = MessageBuilder.withPayload(event).build();
        rocketMQTemplate.sendMessageInTransaction("topic_order_transaction:"+tag, transactionMsg, null);

四.消息消费者使用

大白话RocketMQ系列之当RocketMQ遇上Spring Boot

rocketmq消费者使用

1.使用步骤

(1).在监听类上使用消息监听注解RocketMQMessageListener

其主要属性如下:

  • consumerGroup : 消费组名称
  • topic : 消费的topic
  • selectorType : 子分类类型(默认为TAG)
  • selectorExpression : 子分类表达式(默认为*)
  • messageModel : 消息模式(默认为集群消息:MessageModel.CLUSTERING;也可以是广播消息:MessageModel.BROADCASE)
  • consumeThreadMax : 最大消费线程数
  • maxReconsumeTimes : 最大消费次数
  • consumeTimeout : 阻塞消费线程消费的最大时间(分钟)
  • replyTimeout : 发送回复消息的超时时间
  • accessKey : 云广商RocketMQ的access-key(类似于用户名),默认读取配置rocketmq.consumer.access-key
  • secretKey : 云广商RocketMQ的密码,默认读取配置rocketmq.consumer.secret-key
  • nameServer : RocketMQ注册中心地址,默认读取rocketmq.name-server
  • accessChannel : RocketMQ服务的类型,有2个可选值: LOCAL: 本地(自建的RocketMQ服务);CLOUD: 使用云广商的RocketMQ服务

(2).实现RocketMQListener或RocketMQReplyListener

@Component
@RocketMQMessageListener(
        topic = "topic_order_normal",
        selectorExpression = "tag_order_paid",
        consumerGroup = "GID_ORDER_PAID_COUPON_CONSUMER")
@Slf4j
public class OrderPaidCouponConsumer implements RocketMQListener<OrderPaidEvent> {
    @Override
    public void onMessage(OrderPaidEvent message) {
        log.info("收到订单支付消息,给用户发送代金券, message={}", message);
    }
}

注:这里通过如果不是异常,则消费都会自动确认(ACK);如果抛出异常则会再次尝试消费

© 版权声明

相关文章

暂无评论

您必须登录才能参与评论!
立即登录
none
暂无评论...