前面咱们把接口的性能、健壮性、安全性都拉满了,但如果遇到 “图书借阅后发送短信通知”“批量导入 10 万本图书”“高并发下单减库存” 这些场景,同步处理会导致接口卡顿 —— 比如发送短信要 3 秒,用户就得等 3 秒才能看到借阅成功;批量导入 10 万条数据,接口直接超时。
Spring Boot 整合 RabbitMQ(消息队列)就是解决这类问题的 “神器”,核心是异步处理 + 系统解耦 + 削峰填谷,像奶茶店的 “外卖订单系统”:前台接单后不用等奶茶制作完成,直接把订单传给后台(消息队列),后台异步制作、配送,前台可以继续接下一单,效率翻倍。
今天全程聚焦 Spring Boot 生态,以图书管理系统为案例,手把手实现 “RabbitMQ 消息发送 / 接收、延迟队列、死信队列”,解决异步通知、批量处理、高并发削峰等企业级场景,新手也能直接复制代码落地!
一、先搞懂:RabbitMQ 是什么?(奶茶店类比)
1. 核心概念(消息队列 = 外卖订单系统)
| RabbitMQ 概念 | 类比奶茶店外卖系统 | 作用说明(Spring Boot 中) |
|---|---|---|
| 生产者(Producer) | 前台收银员(发订单) | 发送消息的应用(如 Spring Boot 系统发送 “借阅通知”) |
| 消费者(Consumer) | 后厨制作员 + 配送员(处理订单) | 接收并处理消息的应用(如专门发送短信的服务) |
| 队列(Queue) | 外卖订单货架(存订单) | 存储消息的容器(消息暂存地,确保不丢失) |
| 交换机(Exchange) | 订单分流台(按类型分订单) | 接收生产者消息,按规则路由到对应队列(避免队列直连) |
| 路由键(Routing Key) | 订单类型标签(如 “堂食”“外卖”) | 交换机路由消息的依据(如 “book.borrow” 路由到借阅通知队列) |
| 绑定(Binding) | 分流规则(外卖订单→外卖货架) | 绑定交换机和队列,指定路由规则 |
2. 为什么要用 RabbitMQ?(企业 3 大核心需求)
异步解耦:图书借阅后,不用同步发送短信、记录日志,只需发送消息到 RabbitMQ,其他服务异步处理,接口响应时间从 3 秒→50ms;削峰填谷:1 万用户并发借阅,消息队列缓存所有请求,消费者按能力慢慢处理,避免数据库被瞬间压垮(像奶茶店高峰期订单先存货架,后厨按节奏制作);可靠投递:支持消息持久化、确认机制,确保消息不丢失(比如短信没发送成功,会重新尝试,不会漏通知)。
3. Spring Boot 整合优势
开箱即用:依赖自动配置,不用手动写复杂连接代码;注解驱动:
spring-boot-starter-amqp注解快速实现消息消费,
@RabbitListener简化消息发送;无缝适配:支持消息序列化(JSON)、队列绑定、异常重试等企业级特性。
AmqpTemplate
二、实操 1:Spring Boot 整合 RabbitMQ(基础消息发送 / 接收)
咱们以 “图书借阅后发送短信通知” 为场景,一步步实现基础的消息生产和消费,环境搭建 + 代码落地全程不超过 30 分钟。
步骤 1:环境准备(安装 RabbitMQ)
本地测试(Windows):
安装 Erlang(RabbitMQ 依赖),官网下载对应版本;下载 RabbitMQ 安装包,双击安装,启动服务(命令行执行);访问
rabbitmq-server start,默认账号密码
http://localhost:15672,登录 RabbitMQ 管理界面。
guest/guest
服务器部署(Linux):用 Docker 一键启动:
bash
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq-book rabbitmq:3-management
步骤 2:加依赖(Spring Boot AMQP 核心依赖)
xml
<!-- Spring Boot整合RabbitMQ依赖(生态内官方支持) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.7.10</version>
</dependency>
<!-- JSON序列化依赖(消息以JSON格式传输,默认是Java序列化,不推荐) -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.32</version>
</dependency>
步骤 3:配置 RabbitMQ(application-dev.yml)
yaml
spring:
rabbitmq:
host: localhost # RabbitMQ地址(服务器填IP)
port: 5672 # 通信端口(管理界面是15672)
username: guest # 用户名(默认guest)
password: guest # 密码(默认guest)
virtual-host: / # 虚拟主机(默认/)
publisher-confirm-type: correlated # 开启生产者确认(确保消息发送到交换机)
publisher-returns: true # 开启消息返回(交换机路由失败时回调)
listener:
simple:
acknowledge-mode: manual # 手动确认消息(消费成功后手动签收,避免消息丢失)
concurrency: 2 # 消费者并发数(2个线程同时消费)
max-concurrency: 5 # 最大并发数(高峰期最多5个线程)
prefetch: 1 # 每次拉取1条消息,处理完再拉取下一条(避免消息堆积)
# 自定义队列、交换机、路由键配置(方便代码中引用)
rabbitmq:
queue:
borrow-notice: borrow_notice_queue # 借阅通知队列
exchange:
book-exchange: book_topic_exchange # 图书服务交换机(Topic类型,支持通配符路由)
routing-key:
borrow-notice: book.borrow.notice # 借阅通知路由键
步骤 4:配置 RabbitMQ 队列 / 交换机 / 绑定(Config 类)
新建包,创建
config,定义队列、交换机并绑定:
RabbitMQConfig.java
java
运行
package com.example.springbootdemo.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ队列、交换机、绑定配置(Spring Boot启动时自动创建)
*/
@Configuration
public class RabbitMQConfig {
// 读取配置文件中的队列、交换机、路由键
@Value("${rabbitmq.queue.borrow-notice}")
private String borrowNoticeQueue;
@Value("${rabbitmq.exchange.book-exchange}")
private String bookExchange;
@Value("${rabbitmq.routing-key.borrow-notice}")
private String borrowNoticeRoutingKey;
/**
* 1. 声明借阅通知队列(持久化队列,避免服务重启消息丢失)
*/
@Bean
public Queue borrowNoticeQueue() {
// durable=true:队列持久化;exclusive=false:不排他;autoDelete=false:不自动删除
return QueueBuilder.durable(borrowNoticeQueue).build();
}
/**
* 2. 声明Topic类型交换机(支持通配符路由,灵活匹配路由键)
*/
@Bean
public TopicExchange bookExchange() {
return ExchangeBuilder.topicExchange(bookExchange).durable(true).build();
}
/**
* 3. 绑定队列和交换机(指定路由键)
*/
@Bean
public Binding borrowNoticeBinding() {
// 交换机→路由键→队列:消息携带"book.borrow.notice"路由键,会被路由到borrowNoticeQueue
return BindingBuilder.bind(borrowNoticeQueue())
.to(bookExchange())
.with(borrowNoticeRoutingKey);
}
}
步骤 5:消息生产者(发送借阅通知消息)
在层添加消息发送逻辑,图书借阅成功后发送消息到 RabbitMQ:
service
java
运行
package com.example.springbootdemo.service;
import com.alibaba.fastjson.JSONObject;
import com.example.springbootdemo.entity.Book;
import com.example.springbootdemo.entity.BorrowRecord;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.UUID;
@Service
public class BorrowService {
@Resource
private RabbitTemplate rabbitTemplate;
@Value("${rabbitmq.exchange.book-exchange}")
private String bookExchange;
@Value("${rabbitmq.routing-key.borrow-notice}")
private String borrowNoticeRoutingKey;
// 图书借阅业务+发送消息
public boolean borrowBook(Long bookId, Long userId) {
// 1. 核心业务逻辑(创建借阅记录、减库存,已简化)
BorrowRecord record = new BorrowRecord();
record.setBookId(bookId);
record.setUserId(userId);
// borrowRecordMapper.insert(record); // 保存借阅记录
// bookMapper.decrementStock(bookId); // 减库存
// 2. 构造消息内容(可传用户ID、图书名、借阅时间等)
JSONObject msg = new JSONObject();
msg.put("userId", userId);
msg.put("bookId", bookId);
msg.put("bookName", "Java实战"); // 实际从数据库查询
msg.put("borrowTime", System.currentTimeMillis());
// 3. 发送消息到RabbitMQ(指定交换机、路由键、消息内容)
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 消息唯一ID(用于确认)
rabbitTemplate.convertAndSend(
bookExchange,
borrowNoticeRoutingKey,
msg.toJSONString(),
correlationData
);
System.out.println("借阅通知消息发送成功,消息ID:" + correlationData.getId());
return true;
}
}
步骤 6:消息消费者(接收并处理借阅通知)
新建包,创建
consumer,监听队列并处理消息(发送短信 / 邮件):
BorrowNoticeConsumer.java
java
运行
package com.example.springbootdemo.consumer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 借阅通知消费者(处理RabbitMQ消息,发送短信通知)
*/
@Component
public class BorrowNoticeConsumer {
@Value("${rabbitmq.queue.borrow-notice}")
private String borrowNoticeQueue;
/**
* @RabbitListener(queues = "..."):监听指定队列
*/
@RabbitListener(queues = "${rabbitmq.queue.borrow-notice}")
public void handleBorrowNotice(String msg, Channel channel, Message message) throws IOException {
try {
// 1. 打印消息内容(实际场景:解析消息,调用短信API发送通知)
System.out.println("收到借阅通知消息:" + msg);
// 模拟发送短信(调用第三方短信接口)
// smsService.sendSms(userId, "您已成功借阅图书《Java实战》,请于30天内归还");
// 2. 手动确认消息(acknowledge-mode=manual时必须手动签收,否则消息会重新入队)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println("消息处理成功,已确认签收");
} catch (Exception e) {
// 3. 消息处理失败,重新入队(或投递到死信队列)
System.err.println("消息处理失败,重新入队:" + e.getMessage());
// requeue=true:重新入队;false:丢弃(或投递死信队列)
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
步骤 7:测试基础消息流程
启动 RabbitMQ 服务和 Spring Boot 项目;调用接口(可通过 Controller 暴露);观察日志:
borrowBook(1L, 1001L)
生产者日志:;消费者日志:
借阅通知消息发送成功,消息ID:xxx;RabbitMQ 管理界面:
收到借阅通知消息:{"userId":1001,"bookId":1,"bookName":"Java实战","borrowTime":xxx}队列的 “Ready” 数为 0(消息已被消费),“Total” 数增加 1。
borrow_notice_queue
🔍 核心效果:
借阅接口响应时间从 “同步发送短信 3 秒”→“异步发送消息 50ms”,用户体验大幅提升,且短信发送失败不会影响借阅核心业务。
三、实操 2:进阶功能 —— 延迟队列(图书超期提醒)
企业高频场景:图书借阅 30 天后未归还,自动发送 “超期提醒” 短信。用 RabbitMQ 延迟队列实现 “定时消息”,不用写定时任务,更灵活高效。
步骤 1:配置延迟队列(修改 RabbitMQConfig)
延迟队列核心:通过 “死信交换机 + TTL(消息过期时间)” 实现,消息过期后自动路由到死信队列,消费者监听死信队列处理延迟任务。
java
运行
@Configuration
public class RabbitMQConfig {
// 新增:延迟队列相关配置
@Value("${rabbitmq.queue.delay-notice}")
private String delayNoticeQueue; // 延迟队列(临时存储未过期消息)
@Value("${rabbitmq.queue.dead-letter}")
private String deadLetterQueue; // 死信队列(接收过期消息)
@Value("${rabbitmq.exchange.dead-letter}")
private String deadLetterExchange; // 死信交换机
@Value("${rabbitmq.routing-key.dead-letter}")
private String deadLetterRoutingKey; // 死信路由键
/**
* 1. 声明死信队列(超期提醒实际消费队列)
*/
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable(deadLetterQueue).build();
}
/**
* 2. 声明死信交换机
*/
@Bean
public TopicExchange deadLetterExchange() {
return ExchangeBuilder.topicExchange(deadLetterExchange).durable(true).build();
}
/**
* 3. 绑定死信队列和死信交换机
*/
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue())
.to(deadLetterExchange())
.with(deadLetterRoutingKey);
}
/**
* 4. 声明延迟队列(设置TTL+绑定死信交换机)
*/
@Bean
public Queue delayNoticeQueue() {
return QueueBuilder.durable(delayNoticeQueue)
// 设置消息过期时间(30天=2592000000毫秒,测试时可设30秒=30000)
.ttl(30000)
// 消息过期后路由到死信交换机
.deadLetterExchange(deadLetterExchange)
// 死信路由键(过期消息携带该路由键)
.deadLetterRoutingKey(deadLetterRoutingKey)
.build();
}
}
步骤 2:发送延迟消息(借阅时发送超期提醒)
修改,借阅成功后同时发送 “即时通知” 和 “30 天后超期提醒”:
BorrowService
java
运行
@Service
public class BorrowService {
// 新增延迟队列配置
@Value("${rabbitmq.queue.delay-notice}")
private String delayNoticeQueue;
public boolean borrowBook(Long bookId, Long userId) {
// 1. 核心业务逻辑(略)
// 2. 发送即时借阅通知(之前的代码,略)
// 3. 发送延迟消息(30天后超期提醒)
JSONObject delayMsg = new JSONObject();
delayMsg.put("userId", userId);
delayMsg.put("bookId", bookId);
delayMsg.put("bookName", "Java实战");
CorrelationData delayCorrelationData = new CorrelationData(UUID.randomUUID().toString());
// 延迟消息直接发送到延迟队列(无需指定交换机,直接发队列)
rabbitTemplate.convertAndSend(
delayNoticeQueue,
delayMsg.toJSONString(),
delayCorrelationData
);
System.out.println("超期提醒延迟消息发送成功,30秒后执行(测试用)");
return true;
}
}
步骤 3:监听死信队列(处理超期提醒)
新增死信队列消费者:
java
运行
@Component
public class DeadLetterConsumer {
@RabbitListener(queues = "${rabbitmq.queue.dead-letter}")
public void handleDeadLetter(String msg, Channel channel, Message message) throws IOException {
try {
System.out.println("收到超期提醒消息(延迟30秒执行):" + msg);
// 实际场景:发送超期提醒短信
// smsService.sendSms(userId, "您借阅的图书《Java实战》已超期,请尽快归还");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
}
步骤 4:测试延迟队列
调用借阅接口后,观察日志:
即时打印 “超期提醒延迟消息发送成功”;30 秒后打印 “收到超期提醒消息”,延迟效果生效。
四、实操 3:企业级场景落地 —— 批量导入图书异步处理
批量导入 10 万本图书时,同步处理会导致接口超时,用 RabbitMQ 异步处理,前端发起导入后即可返回,后台异步解析文件、插入数据库。
核心流程:
前端上传 Excel 文件,后端接收后生成 “导入任务 ID”,返回给前端;后端将 “文件路径 + 任务 ID” 作为消息发送到 RabbitMQ;消费者监听队列,异步解析 Excel、批量插入数据库,更新任务状态;前端通过任务 ID 查询导入进度(如 “已导入 3 万条 / 共 10 万条”)。
关键代码(简化版):
java
运行
// 1. 生产者(接收文件后发送消息)
@PostMapping("/book/batch/import")
public Result<String> batchImport(MultipartFile file) throws IOException {
// 保存文件到服务器(如D:/uploads/xxx.xlsx)
String filePath = "D:/uploads/" + file.getOriginalFilename();
file.transferTo(new File(filePath));
// 生成任务ID(前端查询进度用)
String taskId = UUID.randomUUID().toString();
// 保存任务状态(初始:导入中)
redisTemplate.opsForValue().set("import:task:" + taskId, "PROCESSING");
// 发送消息到RabbitMQ
JSONObject msg = new JSONObject();
msg.put("taskId", taskId);
msg.put("filePath", filePath);
rabbitTemplate.convertAndSend(bookExchange, "book.import", msg.toJSONString());
return Result.success("导入任务已发起,任务ID:" + taskId);
}
// 2. 消费者(异步解析Excel)
@RabbitListener(queues = "${rabbitmq.queue.book-import}")
public void handleBatchImport(String msg, Channel channel, Message message) throws IOException {
JSONObject json = JSONObject.parseObject(msg);
String taskId = json.getString("taskId");
String filePath = json.getString("filePath");
try {
// 解析Excel(用EasyExcel等工具)
List<Book> bookList = ExcelUtil.readExcel(filePath, Book.class);
int total = bookList.size();
int successCount = 0;
// 批量插入数据库(每1000条一批)
for (int i = 0; i < bookList.size(); i += 1000) {
int end = Math.min(i + 1000, bookList.size());
bookService.saveBatch(bookList.subList(i, end));
successCount = end;
// 更新进度(前端查询时获取)
redisTemplate.opsForValue().set("import:progress:" + taskId, successCount + "/" + total);
}
// 更新任务状态(成功)
redisTemplate.opsForValue().set("import:task:" + taskId, "SUCCESS");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 更新任务状态(失败)
redisTemplate.opsForValue().set("import:task:" + taskId, "FAIL:" + e.getMessage());
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
五、避坑总结:Spring Boot+RabbitMQ 的 6 个新手坑
队列 / 交换机未绑定,消息路由失败:
坑:生产者发送消息到交换机,但交换机未绑定队列,消息丢失;解决:确保配置正确,或在 RabbitMQ 管理界面手动绑定,开启
Binding回调排查路由失败原因。
publisher-returns
消息未持久化,服务重启消息丢失:
坑:队列或消息发送时未设置持久化,RabbitMQ 重启后消息丢失;解决:队列和交换机都设
durable=false,消息发送时设置
durable=true。
MessageProperties.PERSISTENT_TEXT_PLAIN
手动确认消息未签收,消息重复消费:
坑:但未调用
acknowledge-mode=manual,消息会一直留在队列,重启消费者后重复消费;解决:处理成功调用
basicAck,失败根据场景调用
basicAck(重新入队或丢弃)。
basicNack
消费者并发数设置不合理:
坑:并发数设太大(如 100)导致数据库连接耗尽,设太小(如 1)导致消息堆积;解决:根据数据库性能设置(4 核 8G 服务器设 5-10),配合避免消息积压。
prefetch=1
消息序列化格式不统一:
坑:生产者用 Java 序列化,消费者用 JSON 反序列化,导致解析失败;解决:统一用 JSON 序列化,配置 RabbitMQ 消息转换器:
java
运行
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
延迟队列 TTL 设置错误,消息提前过期:
坑:延迟队列未绑定死信交换机,或 TTL 单位设错(如把 30 秒写成 30);解决:确保延迟队列配置和
deadLetterExchange,TTL 单位是毫秒(30 秒 = 30000)。
deadLetterRoutingKey
总结:RabbitMQ 让 Spring Boot 系统 “更抗造”
Spring Boot+RabbitMQ 的核心价值是 “异步解耦 + 削峰填谷”—— 把非核心业务(如通知、日志)异步化,让核心接口更快;把高并发请求缓存到队列,避免系统被瞬间压垮。
就像奶茶店的外卖系统:前台不用管后台制作,效率翻倍;高峰期订单先存货架,后厨按节奏处理,不会乱套。企业级项目中,RabbitMQ 几乎是 “标配”,解决异步通知、批量处理、高并发削峰等场景,配合之前的性能、安全优化,你的 Spring Boot 系统会更稳定、更抗造。
📦 给你的实战包:我整理了 “Spring Boot+RabbitMQ 实战全量包”,包含:① 队列 / 交换机配置完整代码;② 生产者 / 消费者代码;③ 延迟队列 + 批量导入场景实现;④ RabbitMQ 管理界面操作手册 + Postman 测试脚本,你不用手动写代码,解压后导入 IDEA 就能运行,需要的评论区留言。