在 Spring Boot 中实现 MQTT 消息的负载均衡处理,核心目标是让多个消费者实例(如多台服务器或同一服务器的多个进程)合理分担 MQTT 消息的处理压力,避免单节点过载。实现方式需结合 MQTT 协议特性(如主题订阅、QoS 等级)和负载均衡策略,以下是具体方案:
一、MQTT 负载均衡的核心思路
MQTT 协议本身不直接支持负载均衡,但可通过 “消费端订阅策略” 和 “消息分发机制” 实现:
- 同一主题的消息只被一个消费者处理(类似队列模式,避免重复消费)。
- 不同主题的消息由不同消费者分担(按主题分片,分散压力)。
需依赖 MQTT broker(如 EMQX、Mosquitto)的特性,配合客户端订阅策略实现。

二、基于共享订阅(Shared Subscription)的负载均衡(推荐)
主流 MQTT broker(如 EMQX、HiveMQ)支持 共享订阅 功能,通过特殊的主题格式让多个消费者共享同一主题的消息,broker 会自动将消息分发给其中一个消费者,实现负载均衡。
1. 共享订阅的主题格式
共享订阅的主题以 $share/<group>/ 为前缀,格式如下:
|
$share/<group-name>/<actual-topic> |
- <group-name>:消费者组名称,同一组内的消费者共享消息(同一消息只被组内一个消费者接收)。
- <actual-topic>:实际要订阅的主题(如 sensor/temp)。
例如:$share/group1/sensor/temp,订阅该主题的所有消费者(属于 group1 组)会分担 sensor/temp 主题的消息。

2. Spring Boot 集成实现(以 EMQX 为例)
(1)引入依赖
使用
org.eclipse.paho:org.eclipse.paho.client.mqttv3 客户端:
|
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version> </dependency> |
(2)配置 MQTT 客户端(连接 broker)
创建配置类,初始化 MQTT 客户端并订阅共享主题:
|
@Configuration public class MqttConfig { @Value(“${mqtt.broker-url}”) private String brokerUrl; // 如 tcp://emqx-host:1883 @Value(“${mqtt.client-id}”) private String clientId; // 每个实例的 clientId 必须唯一(如加随机数或主机名) @Value(“${mqtt.username}”) private String username; @Value(“${mqtt.password}”) private String password; @Bean public MqttClient mqttClient() throws MqttException { // 配置连接参数 MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(username); options.setPassword(password.toCharArray()); options.setCleanSession(true); // 断开后不保留会话(按需调整) // 创建客户端(clientId 必须唯一,否则会被 broker 踢下线) MqttClient client = new MqttClient(brokerUrl, clientId + “-” + UUID.randomUUID()); client.connect(options); // 订阅共享主题(关键:使用 $share 前缀) String sharedTopic = “$share/group1/sensor/#”; // 共享组 group1,订阅 sensor 下所有主题 client.subscribe(sharedTopic, 1, new MqttMessageListener()); // QoS 1 return client; } } |
(3)实现消息监听器(处理消息)
|
public class MqttMessageListener implements IMqttMessageListener { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { String payload = new String(message.getPayload(), StandardCharsets.UTF_8); System.out.println(“接收消息:主题=” + topic + “,内容=” + payload + “,处理实例=” + // 业务处理逻辑(如入库、调用接口等) } } |
(4)关键注意事项
- clientId 必须唯一:每个消费者实例的 clientId 不能重复,否则 broker 会认为是同一客户端,导致连接被覆盖。
- 共享组名称一致:同一负载均衡组的消费者必须使用一样的 <group-name>(如 group1),否则不会共享消息。
- broker 支持:确保 MQTT broker 支持共享订阅(EMQX、HiveMQ 支持,Mosquitto 需开启 allow_anonymous 并配置共享订阅插件)。
三、基于主题分片的负载均衡(适用于不支持共享订阅的场景)
若 broker 不支持共享订阅(如旧版 Mosquitto),可通过 主题分片 手动实现负载均衡:
- 将一个大主题拆分为多个子主题(如 sensor/temp/0、sensor/temp/1、sensor/temp/2)。
- 每个消费者实例负责订阅部分子主题(如实例 1 订阅 0 和 1,实例 2 订阅 2 和 3)。
- 生产者发送消息时,通过哈希算法将消息均匀分发到不同子主题。
实现示例
(1)生产者发送消息(按哈希分片)
|
@Service public class MqttProducer { @Autowired private MqttClient mqttClient; private static final int SHARD_COUNT = 4; // 4 个子主题 public void sendMessage(String data) throws MqttException { // 按数据哈希值分片(确保消息均匀分发) int shard = Math.abs(data.hashCode() % SHARD_COUNT); String topic = “sensor/temp/” + shard; // 子主题:sensor/temp/0 ~ 3 mqttClient.publish(topic, data.getBytes(), 1, false); } } |
(2)消费者订阅子主题(按实例分配)
假设有 2 个消费者实例,实例 1 订阅 0、1,实例 2 订阅 2、3:
|
// 实例 1 配置 @Bean public MqttClient mqttClient1() throws MqttException { // … 连接配置省略 client.subscribe(“sensor/temp/0”, 1, new MqttMessageListener()); client.subscribe(“sensor/temp/1”, 1, new MqttMessageListener()); return client; } // 实例 2 配置 @Bean public MqttClient mqttClient2() throws MqttException { // … 连接配置省略 client.subscribe(“sensor/temp/2”, 1, new MqttMessageListener()); client.subscribe(“sensor/temp/3”, 1, new MqttMessageListener()); return client; } |
(3)优缺点
- 优点:不依赖 broker 特性,兼容性强。
- 缺点:需手动维护主题分片和消费者分配,扩容时需重新调整订阅关系,灵活性低。
四、进阶优化:结合消息重试与监控
- 消息重试机制:
若消费者处理消息失败(如业务异常),可通过 MQTT 的 QoS 机制(如 QoS 1 确保消息至少送达一次),或在消费端实现本地重试队列(如用 Redis 缓存失败消息,定时重试)。
- 负载均衡监控:
通过 EMQX Dashboard 或自定义指标(如 Prometheus)监控各消费者的消息处理量、延迟,动态调整消费者实例数量或分片策略。
- 避免单点故障:
确保 MQTT broker 集群部署(如 EMQX 集群),同时消费者实例多节点部署,防止单节点宕机导致消息丢失。
总结
- 优先选择共享订阅:依赖 broker 实现,配置简单、灵活性高,适合大多数场景(推荐 EMQX 作为 broker)。
- 主题分片作为备选:适用于 broker 不支持共享订阅的场景,但需手动维护分片逻辑。
- 核心原则:确保消息均匀分发、避免重复消费、支持动态扩容。






希望得到大家的点赞,欢迎大家评论区评论。48小时后评选获奖赠送一本mqtt编程书籍(3本)