总结rocketmq的知识点
概念
削峰,异步,解耦。这就是消息队列最常用的三大场景。
- topic queue

一个topic消息可以分布在多个broker上,每个broker上同一个topic也会分步在多个queue上存储(conf/broker.conf中defaultTopicQueueNums参数设置默认queue数量)。
- tag
tag就是子主题,是topic的下一级消息分类,一个应用可以只创建一个topic,但是对于发送的不同类型的消息可以通过tag来区分。
- group
ProducerGroup:同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
ConsumerGroup:同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。广播消费模式下,相同ConsumerGroup的每个Consumer实例都接收全量的消息。
NameServer
NameServer的主要功能是为整个MQ集群提供服务协调与治理,具体就是记录维护Topic、Broker的信息,及监控Broker的运行状态。为client提供路由能力(具体实现和zk有区别,NameServer是没有leader和follower区别的,不进行数据同步,通过Broker轮训修改信息)。
NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。NameServer中的Broker、Topic等状态信息不会持久存储,都是由各个角色定时上报并 存储到内存中的(NameServer支持配置参数的持久化,一般用不到)。
NameServer就是一个轻量级的netty服务端,集群中必须先启动它,broker启动之后注册到NameServer上来,NameServer通过30s的心跳来检测broker状态。producer通过NameServer来找到broker。
正常情况下,如果Broker关闭,则会与NameServer断开长连接,Netty的通道关闭监听器会监听到连接断开事件,然后会将这个Broker信息剔除掉。 异常情况下,NameServer中有一个定时任务,每隔10秒扫描一下Broker表,如果某个Broker的心跳包最新时间戳距离当前时间超多120秒,也会判定Broker失效并将其移除。
NameServer在清除失活Broker之后,并没有主动通知生产者,生产者每隔30秒会请求NameServer并获取最新的路由表,那么就意味着,消息生产者总会有30秒的延时,无法实时感知Broker服务器的宕机。所以在这个30秒里,生产者依旧会向失活Broker发送消息,那么消息发送的高可用性如何保证呢?当消息生产者向Broker发送消息返回异常后,消息生产者会选择另外一个Broker上的消息队列,这样就规避了发生故障的Broker,结合重试机制,巧妙实现消息发送的高可用,同时由于不需要NameServer通知众多不固定的生产者,也降低了NameServer实现的复杂性。
NameServer是不主动推送会客户端的,而是由客户端拉取主题的最新路由信息。
由于NameServer之间相互独立,很明显,是一个AP设计(关于CAP理论)。
为什么不用zookeeper?因为不需要master选举, 用不到这些复杂的功能,只需要一个轻量级的元数据服务器就足够了。NameServer并没有提供类似Zookeeper的watcher机制, 而是采用了每30s心跳机制。
心跳机制
单个Broker跟所有Namesrv保持心跳请求,心跳间隔为30秒,心跳请求中包括当前Broker所有的Topic信息。Namesrv会反查Broer的心跳信息, 如果某个Broker在2分钟之内都没有心跳,则认为该Broker下线,调整Topic跟Broker的对应关系。但此时Namesrv不会主动通知Producer、Consumer有Broker宕机。
Consumer跟Broker是长连接,会每隔30秒发心跳信息到Broker。Broker端每10秒检查一次当前存活的Consumer,若发现某个Consumer 2分钟内没有心跳, 就断开与该Consumer的连接,并且向该消费组的其他实例发送通知,触发该消费者集群的负载均衡(rebalance)。
生产者每30秒从Namesrv获取Topic跟Broker的映射关系,更新到本地内存中。再跟Topic涉及的所有Broker建立长连接,每隔30秒发一次心跳。 在Broker端也会每10秒扫描一次当前注册的Producer,如果发现某个Producer超过2分钟都没有发心跳,则断开连接。
高性能、高可靠
RocketMQ的高性能在于顺序写盘(CommitLog)、零拷贝和跳跃读(尽量命中PageCache),高可靠性在于刷盘和Master/Slave,另外NameServer 全部挂掉不影响已经运行的Broker,Producer,Consumer。
消息存储机制
CommitLog
采用MappedByteBuffer这种内存映射的方式有几个限制,其中之一是一次只能映射1.5~2G 的文件至用户态的虚拟内存,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了
1,commitLog是保存消息元数据的地方,所有消息到达Broker后都会保存到commitLog文件。 这里需要强调的是所有topic的消息都会统一保存在commitLog中,举个例子:当前集群有TopicA, TopicB,这两个Toipc的消息会按照消息到达的先后顺序保存到同一个commitLog中,而不是每个Topic有自己独立的commitLog。
2,每个commitLog大小上限为1G,满1G之后会自动新建CommitLog文件做保存数据用。
3,CommitLog的清理机制:
按时间清理,rocketmq默认会清理3天前的commitLog文件; 按磁盘水位清理:当磁盘使用量到达磁盘容量75%,开始清理最老的commitLog文件。
4,文件地址:user.home/store/{user.home}/store/user.home/store/{commitlog}/${fileName}
ConsumerQueue
1,ConsumerQueue相当于CommitLog的索引文件,消费者消费时会先从ConsumerQueue中查找消息的在commitLog中的offset,再去CommitLog中找元数据。如果某个消息只在CommitLog中有数据,没在ConsumerQueue中, 则消费者无法消费。
2,consumerqueue的数据结构包含3部分:
- 消息在commitLog文件实际偏移量(commitLogOffset)
- 消息大小
- 消息tag的哈希值
3,文件地址:user.home/store/consumequeue/{user.home}/store/consumequeue/user.home/store/consumequeue/{topicName}/queueId/{queueId}/queueId/{fileName}
零拷贝
- CPU copy
- DMA copy
- sendFile java transferTo
- mmap java MappedByteBuffer
集群模式
-
单master,单节点
-
多master
-
多master,多slave,异步复制模式
异步复制的模式可能会丢失部分消息
-
多master,多slave,同步复制模式(推荐)
推荐多master+多slave,同步复制+异步刷盘的方式,slave全部复制master的数据之后才返回,slave和master各自异步进行数据刷盘
安装
安装启动name-server, broker
从github的release下面或者官网页面下载源码包,这里选择版本4.4.0。使用maven进行编译
mvn -Prelease-all -DskipTests clean install -U
# 将编译后的安装包copy到centos服务器上的/usr/local/rocket目录下:rocketmq-all-4.4.0\distribution\target\apache-rocketmq.tar.gz
# 默认情况下rocketmq的brocker启动需要很多内存,我们调整一下启动内存,否则启动会报内存不够用
vim bin/runserver.sh
vim bin/runbroker.sh
------------------------------------------------------------------------
-Xms256 -Xmx256 -Xmn128 -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m
------------------------------------------------------------------------
# 启动name server,默认的端口是9876
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
# 启动broker,启动时允许自动创建topic,并且指定配置文件
#【注意】配置文件中需要设置IP地址,否则外部机器访问不了,`brokerIP1 = 192.168.237.128`
# 配置文件conf/broker.conf的内容见下方
nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true -c conf/broker.conf &
tail -f ~/logs/rocketmqlogs/broker.log
# 查看进程
jps
# 查看集群机器列表
sh bin/mqadmin clusterList -n localhost:9876
# 查看topic的路由信息
sh bin/mqadmin topicRoute -n "localhost:9876" -t TopicTest
# 使用内置工具来发送和接收消息
export NAMESRV_ADDR=localhost:9876
# 发送
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
# 接收
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
# 停止broker
sh bin/mqshutdown broker
# 停止name server
sh bin/mqshutdown namesrv
broker配置
conf/broker.conf配置内容:
serverSelectorThreads=3
brokerRole=ASYNC_MASTER
serverSocketRcvBufSize=131072
osPageCacheBusyTimeOutMills=1000
shortPollingTimeMills=1000
clientSocketRcvBufSize=131072
clusterTopicEnable=true
brokerTopicEnable=true
autoCreateTopicEnable=true
maxErrorRateOfBloomFilter=20
maxMsgsNumBatch=64
cleanResourceInterval=10000
commercialBaseCount=1
maxTransferCountOnMessageInMemory=32
brokerFastFailureEnable=true
brokerClusterName=DefaultCluster
flushDiskType=ASYNC_FLUSH
commercialBigCount=1
mapedFileSizeCommitLog=1073741824
consumerFallbehindThreshold=17179869184
autoCreateSubscriptionGroup=true
transientStorePoolEnable=false
flushConsumerOffsetInterval=5000
waitTimeMillsInHeartbeatQueue=31000
diskMaxUsedSpaceRatio=88
flushCommitLogLeastPages=4
cleanFileForciblyEnable=true
slaveReadEnable=false
msgTraceTopicName=RMQ_SYS_TRACE_TOPIC
expectConsumerNumUseFilter=32
traceTopicEnable=false
useEpollNativeSelector=false
enablePropertyFilter=false
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
deleteCommitLogFilesInterval=100
brokerName=broker-a
maxTransferBytesOnMessageInDisk=65536
mapedFileSizeConsumeQueue=300000
listenPort=10911
flushConsumeQueueLeastPages=2
pullMessageThreadPoolNums=20
useReentrantLockWhenPutMessage=false
flushIntervalConsumeQueue=1000
sendThreadPoolQueueCapacity=10000
debugLockEnable=false
haHousekeepingInterval=20000
diskFallRecorded=true
messageIndexEnable=true
clientAsyncSemaphoreValue=65535
clientCallbackExecutorThreads=2
putMsgIndexHightWater=600000
sendMessageThreadPoolNums=1
clientManagerThreadPoolQueueCapacity=1000000
serverSocketSndBufSize=131072
maxDelayTime=40
clientSocketSndBufSize=131072
namesrvAddr=localhost:9876
commercialEnable=true
maxHashSlotNum=5000000
heartbeatThreadPoolNums=2
transactionTimeOut=6000
maxMessageSize=65536
adminBrokerThreadPoolNums=16
defaultQueryMaxNum=32
forceRegister=true
maxTransferBytesOnMessageInMemory=262144
enableConsumeQueueExt=false
longPollingEnable=true
serverWorkerThreads=8
messageIndexSafe=false
deleteConsumeQueueFilesInterval=100
haSlaveFallbehindMax=268435456
serverCallbackExecutorThreads=0
flushCommitLogThoroughInterval=10000
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
commercialTimerCount=1
useTLS=false
redeleteHangedFileInterval=120000
flushIntervalCommitLog=500
rocketmqHome=/usr/local/rocketmq
queryMessageThreadPoolNums=10
messageStorePlugIn=
serverChannelMaxIdleTimeSeconds=120
maxIndexNum=20000000
filterDataCleanTimeSpan=86400000
filterServerNums=0
commitCommitLogLeastPages=4
waitTimeMillsInPullQueue=5000
haSendHeartbeatInterval=5000
clientChannelMaxIdleTimeSeconds=120
filterSupportRetry=false
flushDelayOffsetInterval=10000
duplicationEnable=false
offsetCheckInSlave=false
clientCloseSocketIfTimeout=false
transientStorePoolSize=5
waitTimeMillsInSendQueue=200
warmMapedFileEnable=false
endTransactionThreadPoolNums=12
flushCommitLogTimed=false
flushLeastPagesWhenWarmMapedFile=4096
clientWorkerThreads=4
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
endTransactionPoolQueueCapacity=100000
registerNameServerPeriod=30000
registerBrokerTimeoutMills=6000
accessMessageInMemoryMaxRatio=40
highSpeedMode=false
transactionCheckMax=15
checkCRCOnRecover=true
destroyMapedFileIntervalForcibly=120000
brokerIP2=172.19.0.1
brokerIP1=192.168.237.128
commitIntervalCommitLog=200
clientOnewaySemaphoreValue=65535
traceOn=true
clientManageThreadPoolNums=32
channelNotActiveInterval=60000
mappedFileSizeConsumeQueueExt=50331648
consumerManagerThreadPoolQueueCapacity=1000000
serverOnewaySemaphoreValue=256
haListenPort=10912
enableCalcFilterBitMap=false
clientPooledByteBufAllocatorEnable=false
aclEnable=false
storePathRootDir=/usr/local/rocketmq/store
syncFlushTimeout=5000
rejectTransactionMessage=false
commitCommitLogThoroughInterval=200
connectTimeoutMillis=3000
queryThreadPoolQueueCapacity=20000
regionId=DefaultRegion
consumerManageThreadPoolNums=32
disableConsumeIfConsumerReadSlowly=false
flushConsumerOffsetHistoryInterval=60000
fetchNamesrvAddrByAddressServer=false
haTransferBatchSize=32768
compressedRegister=false
storePathCommitLog=/usr/local/rocketmq/store/commitlog
commercialTransCount=1
transactionCheckInterval=60000
startAcceptSendRequestTimeStamp=0
serverPooledByteBufAllocatorEnable=true
serverAsyncSemaphoreValue=64
waitTimeMillsInTransactionQueue=3000
heartbeatThreadPoolQueueCapacity=50000
deleteWhen=04
abortFile=/usr/local/rocketmq/store/abort
bitMapLengthConsumeQueueExt=112
fastFailIfNoBufferInStorePool=false
defaultTopicQueueNums=4
flushConsumeQueueThoroughInterval=60000
notifyConsumerIdsChangedEnable=true
brokerPermission=6
fileReservedTime=48
storePathIndex=/usr/local/rocketmq/store/index
transferMsgByHeap=true
pullThreadPoolQueueCapacity=100000
brokerId=0
maxTransferCountOnMessageInDisk=8
常用命令
# 创建、更新topic
sh bin/mqadmin updateTopic -c DefaultCluster -n 192.168.237.128:9876 -t MyTopic1
# 删除topic
sh bin/mqadmin deleteTopic -c DefaultCluster -n 192.168.237.128:9876 -t MyTopic1
# 创建、更新订阅组
sh bin/mqadmin updateSubGroup -c DefaultCluster -g MySubGroup -n 192.168.237.128:9876
# 删除订阅组
sh bin/mqadmin deleteSubGroup -c DefaultCluster -n 192.168.237.128:9876 -g MySubGroup
# 查看topic的路由信息
sh bin/mqadmin topicRoute -n 192.168.237.128:9876 -t MyTopic
# 查看topic状态信息
sh bin/mqadmin topicStatus -n 192.168.237.128:9876 -t MyTopic
# 查看topic的cluster信息
sh bin/mqadmin topicClusterList -n 192.168.237.128:9876 -t MyTopic
# 更新配置,-k:key, -v:value
sh bin/mqadmin updateBrokerConfig -c DefaultCluster -n 192.168.237.128:9876 -k listenPort -v 10911
# 更新topic权限,-p
sh bin/mqadmin updateTopicPerm -c DefaultCluster -n 192.168.237.128:9876 -p 6 -t MyTopic1
# 查看broker运行状态信息
sh bin/mqadmin brokerStatus -b 192.168.237.128:10911 -n 192.168.237.128:9876
安装console
clone rocketmq-externals项目,maven编译rocketmq-console项目
mvn clean package -Dmaven.test.skip=true
将target/rocketmq-console-ng-1.0.1.jar拷贝到centos7服务器上,设置环境变量指定name-server,指定端口并启动,也可以在maven编译前修改application.properties文件中的配置
export NAMESRV_ADDR="localhost:9876"
java -Dserver.port=9100 -jar rocketmq-console-ng-1.0.1.jar
# 后台启动
# nohup java -Dserver.port=9100 -jar rocketmq-console-ng-1.0.1.jar >> console.log 2>&1 &
# 后台启动后查看日志
# tail -f console.log
通过原生RocketMQ的api生产、消费消息
简单示例
根据官网Simple Example来进行测试,示例开始之前首先按照顺序启动name-server,broker,console。
- 引入pom依赖
引入pom依赖,只要一个就可以,版本号与rocket服务端版本号保持一致,这里用4.4.0
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
- 创建producer
producer分成几种:同步发送,异步发送,单向发送(不用broker确认消息)
//同步发送
public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("MyProducerGroup");
// Specify name server addresses.
producer.setNamesrvAddr("192.168.237.128:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
//异步发送
public class AsyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("producerGroup1");
// Specify name server addresses.
producer.setNamesrvAddr("192.168.237.128:9876");
//Launch the instance.
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 100; i++) {
final int index = i;
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
//等待异步回调执行完成之后再关闭producer
TimeUnit.SECONDS.sleep(100);
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
//单向发送
public class OnewayProducer {
public static void main(String[] args) throws Exception{
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("producer1");
// Specify name server addresses.
producer.setNamesrvAddr("192.168.237.128:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
producer.sendOneway(msg);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
- 创建consumer
实际业务必须要保证consumer端的幂等性。
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup1");
// Specify name server addresses.
consumer.setNamesrvAddr("192.168.237.128:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
- 测试
启动consumer、producer,观察输出结果,在console中可以查看发送的消息,点开消息详情,可以重发
顺序消息
rocketmq的producer端确保需要顺序消费的消息投递到同一个queue,这样consumer端就可以顺序消费了。哪些消息应该投递到哪个queue是由producer自定义的。
- producer
public class OrderedProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
MQProducer producer = new DefaultMQProducer("example_group_name");
((DefaultMQProducer) producer).setNamesrvAddr("192.168.237.128:9876");
//Launch the instance.
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
//orderId相同的消息发送到相同的queue中,保证顺序消费,比如同一个订单的创建、支付、取消等消息放到同一个queue里面来保证消费者可以顺序消费
int orderId = i % 10;
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//最后一个参数arg传递orderId,broker会对这个arg进行hash来决定这条消息存储到哪个queue中
//MessageQueueSelector.select方法来自定义queue的选择规则
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
//server shutdown
producer.shutdown();
}
}
- consumer
public class OrderedConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setNamesrvAddr("192.168.237.128:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
//一般使用MessageListenerOrderly监听器来消费顺序消息,如果使用MessageListenerConcurrently的话必须使用单线程
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(false);
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
**rocketmq怎么保证队列完全顺序消费? - 中间件兴趣圈的回答 - 知乎
消息广播
默认情况下,多个consumer同时订阅一个topic的一个tag时,producer生产的消息会被负载均衡到各个consumer上进行消费,如果在consumer端设置了广播属性,那producer生产的消息会通过广播的形式让所有的consumer都接收到所有的消息。
- producer
public class BroadcastProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("192.168.237.128:9876");
producer.start();
for (int i = 0; i < 10; i++){
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
- consumer
public class BroadcastConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setNamesrvAddr("192.168.237.128:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//set to broadcast mode
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
}
}
消息延迟
- producer
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// Instantiate a producer to send scheduled messages
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
producer.setNamesrvAddr("192.168.237.128:9876");
// Launch producer
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
// 延时等级是在broker的配置文件中配置的,例如下面,等级3就是10s
// messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
}
// Shutdown producer after use.
producer.shutdown();
}
}
- consumer
public class ScheduledMessageConsumer {
public static void main(String[] args) throws Exception {
// Instantiate message consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
consumer.setNamesrvAddr("192.168.237.128:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// Subscribe topics
consumer.subscribe("TestTopic", "*");
// Register message listener
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// Launch consumer
consumer.start();
}
}
消息批量推送
为了提高消息发送的性能,可以将多条消息组装成一个批次一次发送给broker,同一批次的消息需要满足以下条件:
- 具有相同的topic
- 具有相同waitStoreMsgOK
- 不能设置消息延迟
- 消息总大小不能大于1MB
如果批次里面的消息不确定是否大于1MB,可以将消息拆分成多个批次来发送,参考Batch Example。
消息过滤
- consumer可以通过tag来过滤需要消费的消息
consumer.subscribe(String topic, String subExpression)方法的subExpression参数可以指定一个或多个tag,例如tag1 || tag2 || tag3- 通过
consumer.subscribe方法中的MessageSelector.byTag("tag")实现。
- 对于复杂过滤场景,可以通过producer端给消息设置自定义属性,然后consumer端使用类似SQL的语法过滤,示例:
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
tag,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties.
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
producer.shutdown();
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// only subsribe messages have property a, also a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
应用日志输出至rocketmq
可以将log4j,log4j2,logback日志直接输出到rocketmq,只需要配置相应的LogAppender就可以了,具体参考官方配置文件示例。
事务消息
rocketmq的事务消息主要是采用2PC(2-phase-commit)的方式来提交消息,保证producer本地事务和消息发送的一致性(原子性)。

事务发起方首先发送 prepare 消息到 MQ。
在发送 prepare 消息成功后执行本地事务。
根据本地事务执行结果返回 commit 或者是 rollback。
如果消息是 rollback,MQ 将删除该 prepare 消息不进行下发,如果是 commit 消息,MQ 将会把这个消息发送给 consumer 端。
如果执行本地事务过程中,执行端挂掉,或者超时,Broker 将会不停的询问其同组的其他 producer 来获取状态(Broker调用producer检查本地事务的方法来检查producer本地事务状态)。
Consumer 端的消费成功机制由 MQ 保证。
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
//consumer
public class TransactionConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_consumer_group_name");
consumer.setNamesrvAddr("192.168.237.128:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe("TopicTest-1", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
//业务实现消费回调的时候,当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才会认为这批消息(默认是1条)是消费完成的。
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.*;
//producer
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("example_producer_group_name");
producer.setNamesrvAddr("192.168.237.128:9876");
//由于本地回调监听跟消息的发送会并发进行,所以可以使用线程池来执行操作
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg = new Message("TopicTest-1", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
TimeUnit.SECONDS.sleep(10000);
producer.shutdown();
}
}
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
//执行本地事务以消息回查的逻辑写在这个listener里面
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
/**
* 执行本地事务
* @param msg
* @param arg
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
//执行本地事务后如果返回UNKNOW或者执行超时,broker会调用回查方法checkLocalTransaction
return LocalTransactionState.UNKNOW;
}
/**
* broker进行消息回查
* @param msg
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
//回查时如果返回UNKNOW或者超时的消息,broker会重试16次,第一次6s后重试,以后隔60s重试一次
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
消息重试
发送端重试
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("MyProducerGroup");
// Specify name server addresses.
producer.setNamesrvAddr("192.168.237.128:9876");
//设置重试次数16次,默认2次
producer.setRetryTimesWhenSendFailed(16);
//Launch the instance.
producer.start();
for (int i = 0; i < 10; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
//设置超时时间5s,超时没有发送成功则重试
SendResult sendResult = producer.send(msg, 5000L);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
消费端重试-异常重试
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import william.rmq.common.constant.RocketMQConstant;
import javax.annotation.PostConstruct;
import java.util.List;
@Slf4j
@Service
public class MessageConsumer implements MessageListenerConcurrently {
@Value("${spring.rocketmq.namesrvAddr}")
private String namesrvAddr;
private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultConsumer");
@PostConstruct
public void start() {
try {
consumer.setNamesrvAddr(namesrvAddr);
//从消息队列头部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置集群消费模式
consumer.setMessageModel(MessageModel.CLUSTERING);
//订阅主题
consumer.subscribe("DefaultCluster", "*");
//注册消息监听器
consumer.registerMessageListener(this);
//启动消费端
consumer.start();
log.info("Message Consumer Start...");
System.err.println("Message Consumer Start...");
} catch (MQClientException e) {
log.error("Message Consumer Start Error!!",e);
}
}
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (CollectionUtils.isEmpty(msgs)) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
MessageExt message = msgs.get(0);
try {
//逐条消费
String messageBody = new String(message.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.err.println("Message Consumer: Handle New Message: messageId: " + message.getMsgId() + ",topic: " + message.getTopic() + ",tags: " + message.getTags() + ",messageBody: " + messageBody);
//模拟业务异常
int i = 1 / 0;
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
log.error("Consume Message Error!!", e);
//抛出异常时,返回ConsumeConcurrentlyStatus.RECONSUME_LATER,尝试重试。当重试指定次数后返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS
int reconsumeTimes = message.getReconsumeTimes();
System.err.println("Now Retry Times: " + reconsumeTimes);
if (reconsumeTimes >= RocketMQConstant.MAX_RETRY_TIMES) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
消费端重试-超时重试
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import william.rmq.common.constant.RocketMQConstant;
import javax.annotation.PostConstruct;
import java.util.List;
@Slf4j
@Service
public class MessageConsumer implements MessageListenerConcurrently {
@Value("${spring.rocketmq.namesrvAddr}")
private String namesrvAddr;
private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultConsumer");
@PostConstruct
public void start() {
try {
consumer.setNamesrvAddr(namesrvAddr);
//从消息队列头部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置集群消费模式
consumer.setMessageModel(MessageModel.CLUSTERING);
//设置消费超时时间(分钟)
consumer.setConsumeTimeout(RocketMQConstant.CONSUMER_TIMEOUT_MINUTES);
//订阅主题
consumer.subscribe("DefaultCluster", "*");
//注册消息监听器
consumer.registerMessageListener(this);
//启动消费端
consumer.start();
log.info("Message Consumer Start...");
System.err.println("Message Consumer Start...");
} catch (MQClientException e) {
log.error("Message Consumer Start Error!!",e);
}
}
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (CollectionUtils.isEmpty(msgs)) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
MessageExt message = msgs.get(0);
try {
//逐条消费
String messageBody = new String(message.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.err.println("Message Consumer: Handle New Message: messageId: " + message.getMsgId() + ",topic: " +
message.getTopic() + ",tags: " + message.getTags() + ",messageBody: " + messageBody);
//模拟耗时操作2分钟,大于设置的消费超时时间
Thread.sleep(1000L * 60 * 2);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
log.error("Consume Message Error!!", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
死信队列
集成springboot
代码示例演示在同一个springboot项目中发送、消费消息。
pom依赖
<properties>
<springboot.version>2.0.3.RELEASE</springboot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>${springboot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
application.properties添加rocketmq配置
server.port=9666
rocketmq.name-server=192.168.237.128:9876
rocketmq.producer.group=my-group
入口类就是普通的springboot启动类
@SpringBootApplication
public class RocketSpringBootApplication {
public static void main(String[] args) {
SpringApplication.run(RocketSpringBootApplication.class,args);
}
}
添加controller,注入RocketMQTemplate来演示发送消息,RocketMQTemplate对象会根据配置自动注入
@RestController
public class SendMsgController {
@Autowired
RocketMQTemplate rocketMQTemplate;
/**
* 同步发送消息
* @return
*/
@GetMapping("/sendSync")
public String sendSync(){
rocketMQTemplate.convertAndSend("test-topic","this is sync message.");
return "ok";
}
/**
* 异步发送消息
* @return
*/
@GetMapping("/sendAsync")
public String sendAsync(){
rocketMQTemplate.asyncSend("test-topic", "this is async message.",new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("传输成功");
}
@Override
public void onException(Throwable e) {
System.out.println("传输失败");
}
});
return "ok";
}
/**
* 只负责发送消息,不等待broker应答,适用于可靠性不高的消息发送场景
* @return
*/
@GetMapping("/sendOneway")
public String sendOneway(){
rocketMQTemplate.sendOneWay("test-topic","this is oneway message.");
return "ok";
}
/**
* 发送延时消息
* @return
*/
@GetMapping("/sendDelayedMsg")
public String sendDelayedMsg(){
Message message = MessageBuilder.withPayload("this is delayed message.").build();
/**
* 延时级别是conf/broker.conf中配置的,默认的配置如下:
* messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
* 这里指定delayLevel=2也就是延时5s
*/
rocketMQTemplate.syncSend("test-topic",message, 1000, 2);
return "ok";
}
/**
* 发送consumer可以根据hashKey顺序消费的消息
* consumer端通过@RocketMQMessageListener(topic = "test-topic", consumerGroup = "group2",consumeMode = ConsumeMode.ORDERLY)指定顺序消费消息
* @return
*/
@GetMapping("/sendOrderly")
public String sendOrderly(){
rocketMQTemplate.asyncSendOrderly("test-topic", "this is ordered message", "hashKey", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功");
}
@Override
public void onException(Throwable throwable) {
System.out.println("发送失败");
}
});
return "ok";
}
/**
* 发送事务消息
* @return
*/
@GetMapping("/sendTx")
public String sendTx(){
Message message = MessageBuilder.withPayload("this is transaction message").build();
System.out.println(Thread.currentThread().getName());
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("group1", "test-topic", message,"test");
System.out.println(result.getTransactionId());
return "ok";
}
}
//配合测试发送事务消息,需要加一个本地事务监听器类
@RocketMQTransactionListener(txProducerGroup="group1")
public class TransactionListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// local transaction process, return bollback, commit or unknown
System.out.println("executeLocalTransaction:"+JSON.toJSONString(msg));
// 返回UNKNOWN之后,broker过几秒会调用checkLocalTransaction回查消息状态,官方文档描述第一次回查在6s之后,之后每次都是60s回查一次,一共查15次,还是返回UNKNOWN或者超时的话需要人工介入
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
System.out.println("checkLocalTransaction:"+JSON.toJSONString(msg));
return RocketMQLocalTransactionState.COMMIT;
}
}
用来消费消息的类
@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "group2", consumeMode = ConsumeMode.CONCURRENTLY,messageModel=MessageModel.CLUSTERING)
//@RocketMQMessageListener(topic = "test-topic", consumerGroup = "group2", selectorExpression="tag1",selectorType = SelectorType.TAG)
public class ConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("received message:"+ message);
}
}
集成springcloud
参考官方github示例文档以及博客spring-cloud-alibaba-rocketmq。
最佳实践
- producer端:
- 一个应用尽可能只使用一个 Topic,消息子类型用 tags 来标识,tags可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用 tags 在 broker 做消息过滤。
- 每个消息在业务层面的唯一标识码,要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过Topic,key来查询返条消息内容,以及消息被谁消费。由于是哈> 希索引,请务必保证 key尽可能唯一,这样可以避免潜在的哈希冲突。
- 消息发送成功或者失败,要打印消息日志,务必要打印 sendresult 和 key 字段。
- 对于消息不可丢失应用,务必要有消息重发机制。例如:消息发送失败,存储到数据库,能有定时程序尝试重发或者人工触发重发。
- 某些应用如果不关注消息是否发送成功,请直接使用sendOneWay方法发送消息。
- consumer端:
- Consumer 数量要小于等于queue的总数量,由于Topic下的queue会被相对均匀的分配给Consumer,如果 Consumer 超过queue的数量,那多余的 Consumer 将没有queue可以消费消息。
- 消费过程要做到幂等(即消费端去重),RocketMQ为了保证性能并不支持严格的消息去重。
- 尽量使用批量方式消费,RocketMQ消费端采用pull方式拉取消息,通过consumeMessageBatchMaxSize参数可以增加单次拉取的消息数量,可以很大程度上提高消费吞吐量。另外,提高消费并行度也可以通过增> 加Consumer处理线程的方式,对应参数consumeThreadMin和consumeThreadMax。
- 消息发送成功或者失败,要打印消息日志。
- 线上建议关闭autoCreateTopicEnable配置
该配置用于在Topic不存在时自动创建,会造成的问题是自动新建的Topic只会存在于一台broker上,后续所有对该Topic的请求都会局限在单台broker上,造成单点压力。
- broker master宕机情况是否会丢消息
broker master宕机,虽然理论上来说不能向该broker写入但slave仍然能支持消费,但受限于rocketmq的网络连接机制,默认情况下最多需要30秒,消费者才会发现该情况,这个时间可通过修改参数>
pollNameServerInteval来缩短。这个时间段内,发往该broker的请求都是失败的,而且该broker的消息无法消费,因为此时消费者不知道该broker已经挂掉。 直到消费者得到master宕机通知后,才会转向> slave进行消费,但是slave不能保证master的消息100%都同步过来了,因此会有少量的消息丢失。但是消息最终不会丢,一旦master恢复,未同步过去的消息仍然会被消费掉。
问题
如何保证消息不丢失?
producer端
同步阻塞发送,设置超时时间,设置重试次数
发送事务消息
broker端
CommitLog同步刷盘-异步刷盘
1主N从,同步复制-异步复制
consumer端
异常重试
超时重试
死信队列