简介
角色介绍
- Producer:消息的发送者
- Consumer:消息的接收者
- Broker: 暂存和传输消息
- NameServer:管理Broker
- Topic:区分消息的种类;一个发送者可以发送消息或者多个topic;一个消息的接收者可以订阅一个或者Topic消息
- Message queue:相当于是Topic的分区;用于并行发送和接收消息

集群搭建
双主双从搭建
总体架构
消息高可用采用2m-2s-sync方式

集群工作流程
- 启动NameServer,NameServer起来后监听端口,等待Broker,Producer,Consumer连上来,相当于一个路由控制中心。
- Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
- 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
- Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
- Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
服务器环境
|
IP地址 |
角色 |
架构模式 |
备注 |
|
10.177.32.73 |
nameserver,brokerserver |
Master1,slave2 |
|
|
10.177.32.74 |
nameserver,brokerserver |
Master2,slave1 |
安装步骤
1. 更改host文件
vim /etc/hosts
# nameserver
192.168.2.12 rocketmq-nameserver1
192.168.2.13 rocketmq-nameserver2
# broker
192.168.2.12 rocketmq-master1
192.168.2.12 rocketmq-slave2
192.168.2.13 rocketmq-master2
192.168.2.13 rocketmq-slave1
配置完毕后,重启网卡
systemctl restart NetworkManager
2. 更改防火墙设置
# 关闭防火墙
systemctl stop firewalld.service
# 查看防火墙的状态
firewall-cmd –state
# 禁止firewall开机启动
systemctl disable firewalld.service
3. 更改环境变量

保存,随后source /etc/profile
4. 创建存储路径
mkdir /data/app/rocketmq/store
mkdir /data/app/rocketmq/store/commitlog
mkdir /data/app/rocketmq/store/consumequeue
mkdir /data/app/rocketmq/store/index
mkdir /data/app/rocketmq/store-1
mkdir /data/app/rocketmq/store-1/commitlog
mkdir/data/app/rocketmq/store-1/consumequeue
mkdir /data/app/rocketmq/store-1/index
mkdir /data/app/rocketmq/store-2
mkdir /data/app/rocketmq/store-2/commitlog
mkdir/data/app/rocketmq/store-2/consumequeue
mkdir /data/app/rocketmq/store-2/index
mkdir /data/app/rocketmq/store-3
mkdir /data/app/rocketmq/store-3/commitlog
mkdir/data/app/rocketmq/store-3/consumequeuemkdir
/data/app/rocketmq/store-3/index
5. 更改配置文件
在10.177.32.73修改,master1节点:
vim /data/app/rocketmq/conf/2m-2s-sync/broker-a.properties
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表明 Master,>0 表明 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,提议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,提议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
#diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/data/app/rocketmq/store
storePathCommitLog=/data/app/rocketmq/store/commitlog
storePathConsumeQueue=/data/app/rocketmq/store/consumequeue
storePathIndex=/data/app/rocketmq/store/index
storeCheckpoint=/data/app/rocketmq/store/checkpoint
abortFile=/data/app/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
在10.177.32.73修改,slave2节点:
vim /data/app/rocketmq/conf/2m-2s-sync/broker-b-s.properties
brokerClusterName=rocketmq-cluster
brokerName=broker-b
brokerId=1
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=11011
deleteWhen=04
fileReservedTime=48
storePathRootDir=/data/app/rocketmq/store-1
storePathCommitLog=/data/app/rocketmq/store-1/commitlog
storePathConsumeQueue=/data/app/rocketmq/store-1/consumequeue
storePathIndex=/data/app/rocketmq/store-1/index
storeCheckpoint=/data/app/rocketmq/store-1/checkpoint
abortFile=/data/app/rocketmq/store-1/abort
maxMessageSize=65536
brokerRole=SLAVEflushDiskType=SYNC_FLUSH
在10.177.32.74修改,master2节点:
vim /data/app/rocketmq/conf/2m-2s-sync/broker-b.properties
brokerClusterName=rocketmq-cluster
brokerName=broker-b
brokerId=0
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
deleteWhen=04
fileReservedTime=48
storePathRootDir=/data/app/rocketmq/store-2
storePathCommitLog=/data/app/rocketmq/store-2/commitlog
storePathConsumeQueue=/data/app/rocketmq/store-2/consumequeue
storePathIndex=/data/app/rocketmq/store-2/index
storeCheckpoint=/data/app/rocketmq/store-2/checkpoint
abortFile=/data/app/rocketmq/store-2/abort
maxMessageSize=65536
brokerRole=SYNC_MASTERflushDiskType=SYNC_FLUSH
在10.177.32.74修改,slave2节点:
vim /data/app/rocketmq/conf/2m-2s-sync/broker-a-s.properties
brokerClusterName=rocketmq-cluster
brokerName=broker-a
brokerId=1
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=11011
deleteWhen=04
fileReservedTime=48
storePathRootDir=/data/app/rocketmq/store-3
storePathCommitLog=/data/app/rocketmq/store-3/commitlog
storePathConsumeQueue=/data/app/rocketmq/store-3/consumequeue
storePathIndex=/data/app/rocketmq/store-3/index
storeCheckpoint=/data/app/rocketmq/store-3/checkpoint
abortFile=/data/app/rocketmq/store-3/abort
maxMessageSize=65536
brokerRole=SLAVEflushDiskType=SYNC_FLUSH
6. 修改启动脚本
修改runbroker.sh
vi /data/app/rocketmq/bin/runbroker.sh
# 可按需根据内存大小进行jvm参数进行调整
JAVA_OPT=”${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m”
附录:
-Xms:堆内存的初始大小,默认为物理内存的1/64
-Xmx:堆内存的最大大小,默认为物理内存的1/4
-Xmn:堆内新生代的大小。通过这个值也可以得到老生代的大小,即-Xmx减去-Xmn
-Xss:设置每个线程可使用的内存大小,即栈的大小。在一样物理内存下,减小这个值能生成更多的线程,当然操作系统对一个进程内的线程数还是有限制的,不能无限生成。线程栈的大小是个双刃剑,如果设置过小,可能会出现栈溢出,特别是在该线程内有递归、大的循环时出现溢出的可能性更大,如果该值设置过大,就有影响到创建栈的数量,如果是多线程的应用,就会出现内存溢出的错误。
修改runserver.sh
vim /data/app/rocketmq/bin/runserver.sh
JAVA_OPT=”${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m”
服务启动
- 启动NameServer集群
分别在10.177.32.73和10.177.32.74启动nameServer
cd /data/app/rocketmq/bin
nohup sh mqnamesrv &

2. 启动broker集群
在10.177.32.73中启动master1和slave2节点
cd /data/app/rocketmq/bin
nohup sh mqbroker -c /data/app/rocketmq/conf/2m-2s-sync/broker-a.properties &
cd /data/app/rocketmq/bin
nohup sh mqbroker -c /data/appl/rocketmq/conf/2m-2s-sync/broker-b-s.properties &
在10.177.32.74中启动master2和slave1节点
cd /data/app/rocketmq/bin
nohup sh mqbroker -c /data/app/rocketmq/conf/2m-2s-sync/broker-b.properties &
cd /data/app/rocketmq/bin
nohup sh mqbroker -c /data/appl/rocketmq/conf/2m-2s-sync/broker-a-s.properties &
查看结果:

服务停止
./mqshutdown namesrv
./mqshutdown broker
# 再次查看jps,进程已经不存在了
对外开放端口
|
端口 |
用途 |
|
9876 |
namesrv端口 |
|
10911 |
broker-master端口 |
|
11011 |
broker-slave端口 |
|
10909 |
fastRemoting端口 |
提议优化项
1. 生产者发送超时

缘由分析: 默认设置的sendMessage时间为200ms,当前broker处理时间已超过其阈值。
优化项:
waitTimeMillsInSendQueue=3000 # 更改超时时间
osPageCacheBusyTimeOutMills=3000 # 更改系统缓存页时间
sendMessageThreadPoolNums: 64 # 发送消息最大线程数
useReentrantLockWhenPutMessage: true # 是否使用可重入锁