rocketmq源码梳理之push模式消费者的建立
rocketmq push消费模式消费者的建立过程
示例代码
//使用消费者组名初始化push模式消费者对象 消费者组名要唯一
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
//设置消费起始位置 这里指定的是从头开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置订阅的topic 以及tag
consumer.subscribe("TopicTest", "*");
//设置消费消息后的回调函数 这里使用的不是有序消费
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者消费
consumer.start();
代码的执行流程
1. 消费者对象的初始化
//使用消费者组名初始化push模式消费者对象 消费者组名要唯一
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
进入DefaultMqPushConsumer对象,可以看到声明了很多成员变量并初始化了默认值.整理如下:
变量名 | 变量类型 | 变量含义 | 默认值 |
---|---|---|---|
messageModel | MessageModel | 消费消息的模式,有两种配置,集群和广播 | 集群,MessageModel.CLUSTERING |
consumeFromWhere | ConsumeFromWhere | 从何处开始消费,配置值有从最开始、从最新、从指定时间戳 | 从最新处,ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET; |
consumeTimestamp | String | 消费时间戳,yyyyMMddHHmmss格式 | 当前时间往前推30分钟 |
consumeThreadMin | int | 消费线程最小值 | 20 |
consumeThreadMax | int | 消费线程最大值 | 20 |
adjustThreadPoolNumsThreshold | int | 动态调整线程池数量的阈值 | 10000 |
consumeConcurrentlyMaxSpan | int | 并发消费最大数据量 | 2000 |
pullThresholdForQueue | int | 队列缓存的最大消息量 | 1000 |
pullThresholdSizeForQueue | int | 队列拉取的消息的大小阈值,单位为MB | 100 |
pullThresholdForTopic | int | topic一次性拉取的最大消息量,例如改值设置1000时,而有10个队列的话,则每个队列最大只能拉取100个,相当于pullThresholdForQueue 设置了100 | -1,不限制 |
pullInterval | long | 拉取间隔 | 0,拉取后不等待 |
consumeMessageBatchMaxSize | int | 消费的批量大小 | 1 |
pullBatchSize | int | 批量拉取大小 | 32 |
postSubscriptionWhenPull | boolean | 拉取的时候是否提交订阅信息 | false |
unitMode | boolean | 单元模式 | false |
maxReconsumeTimes | int | 最大重试次数,有序消费时,-1代表无穷大,无序消费时,-1代表16次 | -1 |
suspendCurrentQueueTimeMillis | long | —- | 1000 |
consumeTimeout | long | 消费超时时间,单位分钟 | 15 |
awaitTerminationMillisWhenShutdown | long | 停止时暂停等消费时间,0就是不等待 | 0 |
同时DefaultMqPushConsumer对象继承了ClientConfig对象,该对象也模式初始化了一些配置,整理如下
变量名 | 变量类型 | 变量含义 | 默认值 |
---|---|---|---|
namesrvAddr | String | 元数据服务器地址 | 从系统参数rocketmq.namesrv.addr中获取,获取不到就从环境变量NAMESRV_ADDR中获取 |
clientIP | String | 客户端ip | 通过RemotingUtil.getLocalAddress()获取 |
instanceName | String | 实例名称 | 从系统参数rocketmq.client.name中获取,获取不到就使用默认名称DEFAULT |
clientCallbackExecutorThreads | int | 客户端回调执行的线程数 | 通过Runtime.getRuntime().availableProcessors()获取 |
accessChannel | AccessChannel | 访问通道,可选值有LOCAL和CLOUD | AccessChannel.LOCAL |
pollNameServerInterval | long | 从元数据服务器拉取信息间隔 | 30000,单位毫秒 |
heartbeatBrokerInterval | long | 心跳间隔时间 | 30000,单位毫秒 |
persistConsumerOffsetInterval | long | 消费位移持久化间隔 | 5000,单位毫秒 |
pullTimeDelayMillsWhenException | long | 出现异常后拉取延迟时间 | 1000,单位毫秒 |
unitMode | boolean | 单元模式 | false |
vipChannelEnabled | boolean | 是否开启vip通道 | 从系统参数com.rocketmq.sendMessageWithVIPChannel获取,获取不到默认值为false |
useTLS | boolean | 是否使用tls安全协议 | 从系统参数tls.enable获取,获取不到默认值为false |
mqClientApiTimeout | long | 客户端超时时间 | 3000,单位毫秒 |
language | LanguageCode | 代码语言 | LanguageCode.JAVA |
调用了另一个重载的构造方法,并设置了默认的再平衡策略AllocateMessageQueueAveragely
重载的构造方法初始化了DefaultMQPushConsumerImpl对象
DefaultMQPushConsumerImpl对象初始化了设置了pullTimeDelayMillsWhenException,该参数为当出现异常时,拉取消息推迟多少秒,默认值从ClientConfig中获取的,为1000毫秒
同时该类也默认初始化了不少配置,整理如下:
变量名 | 变量类型 | 变量含义 | 默认值 |
---|---|---|---|
pullTimeDelayMillsWhenException | long | 出现异常后拉取延迟时间 | 3000,单位毫秒,但是在构造函数执行时,默认会重新赋值,改为ClientConfig中的默认值,为1000 |
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL | long | 出现流量控制后拉取延迟时间 | 50,单位毫秒 |
PULL_TIME_DELAY_MILLS_WHEN_SUSPEND | long | 当消费端暂停后拉取延迟时间 | 1000,单位毫秒 |
BROKER_SUSPEND_MAX_TIME_MILLIS | long | 节点暂停最大时间 | 15000,单位毫秒 |
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND | long | 当暂停时消费超时时间 | 30000,单位毫秒 |
rebalanceImpl | RebalanceImpl | 再平衡类 | RebalancePushImpl,传入了当前对象构造 |
consumerStartTimestamp | long | 消费开始时间戳 | System.currentTimeMillis(),当前时间 |
serviceState | ServiceState | 消费者当前状态,值有CREATE_JUST(创建未启动) 、RUNNING(正在运行)、SHUTDOWN_ALREADY(已经停止)、START_FAILED(启动失败) | ServiceState.CREATE_JUST |
至此DefaultMQPushConsumer对象初始化完毕,该环节主要是初始化一些配置以及关联对象的一些配置
2. 个性化配置
//设置消费起始位置 这里指定的是从头开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
对于一些配置的设置只是改变了下配置值,如上面的设置起始位置,在具体用到的时候会生效.可设置的配置可以关注下上面梳理的各个对象初始化的配置.
3. 配置订阅信息和回调函数
//设置订阅的topic 以及tag
consumer.subscribe("TopicTest", "*");
//设置消费消息后的回调函数 这里使用的不是有序消费
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
继续调用了DefaultMQPushConsumerImpl对象的subscribe方法,另外此处通过调用withNamespace方法对topic名称进行包装,就是给topic名称前再加上命名空间的值,如命名空间为aaa,topic为test,此处就会包装成aaa%topic,通过配置命名空间,可以在不改动代码的情况下,切换mq的消费环境
首先使用FilterAPI.buildSubscriptionData(topic, subExpression)生成订阅对象SubscriptionData.该对象保存了订阅的topic,以及配置的过滤表达式,如果指定了tag,将会把tag放入tagSet中,并且将tag的hash值放入codeSet中用以过滤.
将subscriptionData放入rebalanceImpl的subscriptionInner成员变量中,这是一个map,key为topic,值为订阅对象
此时mQClientFactory对象还未初始化,所以此时还不执行
然后将消费到消息后的回调函数复制给DefaultMQPushConsumer和DefaultMQPushConsumerImpl对象的messageListener对象中
4. 启动消费者
//启动消费者消费
consumer.start();
跟之前的topic一样,给消费者组名加上namespace值
然后执行defaultMQPushConsumerImpl的start方法
该方法根据当前服务的状态执行不同的逻辑,由于是刚创建,状态值为CREATE_JUST,所以执行这个分支的逻辑.
首先将服务状态修改为START_FAILED再执行后续逻辑,以防止启动异常后重复启动
checkConfig检查必要的配置以及配置格式是否正确
copySubscription里主要是为当前消费者组增加retry消费者组
当消费模式是集群时,调用changeInstanceNameToPID把默认的实例名改为使用进程id
初始化MQClientInstance对象,并以clientId为key,放入factoryTable中.该对象主要用netty与服务端通信
初始化了nettyClientConfig对象,主要配置netty连接的相关配置
初始化了ClientRemotingProcessor对象,处理服务端的一些响应逻辑
初始化了MQClientAPIImpl,将clientRemotingProcessor的响应逻辑按照响应编码注册到了netty中
初始化了MQAdminImpl对象
初始化了PullMessageService对象
初始化了RebalanceService对象
初始化了defaultMQProducer对象
初始化了consumerStatsManager对象,可打印状态信息
紧接着主线的初始化
给自平衡对象设置消费者组名,消费模式,再平衡策略,以及通信客户端
紧接着初始化了PullAPIWrapper对象
紧接着初始化了OffsetStore对象,当下消费模式为集群模式,所以使用了RemoteBrokerOffsetStore.
紧接着根据回调函数类型,来设置消费是否有序,当下为无序消费,ConsumeMessageConcurrentlyService,调用该类的start方法,会启动一个定期执行的线程池,用以清理过期消息
以consumerGroup为key,当前消费者对象为值放入MQClientInstance的consumerTable中
然后调用MQClientInstance的start方法
首先获取元数据服务器地址
然后调用mqClientApiImpl的start方法与mq服务端建立netty连接
然后启动部分定时任务
然后启动拉取线程
然后启动再平衡线程
然后启动默认的producer线程
然后修改服务状态为Running
至此初始化结束.
后面都是各个线程执行操作.核心就是以下几个线程
RebalanceService
定期再平衡
PullMessageService定期拉取数据
fetchNameServerAddr定期获取元数据地址
updateTopicRouteInfoFromNameServer定期获取topic路由
cleanOfflineBroker定期清理下线节点
sendHeartbeatToAllBrokerWithLock定期发送心跳线程
persistAllConsumerOffset持久化所有消费位移
adjustThreadPool定期调整线程池
下面重点介绍这些线程的作用
各线程作用
RebalanceService
等待间隔waitInterval,取自系统参数rocketmq.client.rebalance.waitInterval,如果未设置,默认值为20ms
如果hasNotified为true,则不等待直接返回
否则等待interval时间再返回
遍历consumerTable,执行每一个消费者对象的doRebalance方法,当前为Push模式的消费者,所以执行DefaultMQPushConsumerImpl的doRebalance方法
最终调用的RebalanceImpl的doRebalance方法,当前是无序
获取订阅信息,根据订阅信息获取topic,然后调用rebalanceByTopic方法,该方法根据消费模式执行不同的再平衡逻辑,当前消费模式是集群
获取当前topic的队列信息
获取当前消费者组的各个消费者id
根据再平衡策略将队列分配给各个消费者组,当前默认分配策略是平均分配
算法如下:
假如有11个队列,4个消费者,平分的话每个消费者能分两个,余数为3.余数的3个再根据消费者顺序按顺序分配给前3个消费者.这样4个消费者分配方案就是3332
给当前消费者分配好消费队列后,与之前分配的队列进行比较.
如果之前分配的队列不在新的分配队列里了,则将其移除
如果是新添加的队列,则根据消费模式去获取消费的位移,根据配置的消费起始,去计算位移.例如当前配置的是从最开始处获取位移
先从本地获取位移,本地有位移以本地为主,本地要是没位移则如果刚开始新建设置为-1,如果已经设置了-1则从0开始消费
如果位移是大于等于0的,则新建一个PullRequest对象以当前位移开始去拉数据.
如果产生了队列的变化,则执行messageQueueChanged方法通知给服务端
根据当前时间戳更新队列的版本号,根据当前队列数量计算pullThresholdForTopic和pullThresholdSizeForTopic
然后通过发送心跳包将订阅信息更新给服务端.
至此再平衡结束
PullMessageService
从pullRequestQueue中获取pullRequest对象,然后去拉取消息
判断当前ProcessQueue是否还是该消费者消费,如果不是直接返回
判断当前消费对象是否是暂停状态,如果是延迟放入队列直接返回
根据当前ProcessQueue缓存的消息数量和消息大小判断是否超过了配置的大小,超过则进行流控延迟放入队列直接返回
判断当前ProcessQueue的跨度是否超过配置值,超过也进行流控延迟放入队列直接返回
获取订阅信息,如果订阅信息为空延迟放入队列直接返回
初始化回调函数,用于处理消费到的消息,具体消费逻辑后续再说
使用netty客户端拉取消息
可以看到将调用信息封装成了一个ResponseFuture对象,并且在netty调用回调里调用成功和失败通过ResponseFuture执行了不同的逻辑.
在netty客户端对象NettyRemotingClient初始化的时候,也有初始化了一个定时线程
该线程会每秒执行一次,获取ResponseFuture对象,判断消息超时后会同样调用回调逻辑.
此处再看回调函数里的处理
当有响应结果才会去调用Onsuccess方法,其他都是调用OnException
当OnSuccess时会根据获取的响应结果的状态执行不同的逻辑
当是Found时,即是拉取到了消息,首先获取下次拉取消息的开始位移,留待后续拉取,然后将拉取到的消息调用consumeMessageService.submitConsumeRequest进行处理,目前消费方式不是有序,使用的是ConsumeMessageConcurrentlyService对象,该方法内部会将拉取到的消息封装成ConsumeRequest对象,该对象是个线程,然后内部处理会调用一开始初始化Consumer对象传入的lisener的cousumerMessage方法进行处理
当是NO_NEW_MSG和NO_MATCHED_MSG时,即本次没拉取到消息,则立即再次拉取
当时OFFSET_ILLEGAL时,即位移是不合法的,则移除当前的ProcessQueue,当再平衡时会重新分配,重新分配的时候会重新获取最新位移
当OnException时,会打印对应的异常信息,并延迟放入队列等待重新拉取
fetchNameServerAddr
定期获取NameServer线程
如果NameServer是空,该线程会定期访问配置的接口获取nameserver信息
可以做到在运行过程中切换mq环境
updateTopicRouteInfoFromNameServer
定期根据当前消费者所订阅的topic从NameServer中获取元数据信息,包括broker节点信息,队列信息等
cleanOfflineBroker
定期清理下线节点
sendHeartbeatToAllBrokerWithLock
定期与节点发送心跳信息,将当前消费者的信息发送给broker端
persistAllConsumerOffset
定期持久化消费位移到内存offsetStore对象里
adjustThreadPool
定期调整线程池大小
内存dump里需要关注的对象
MqClientInstance
与mq服务端交互的客户端对象,以instanceName确定唯一,每个instanceName会单独建立一个对象
DefaultMQPushConsumerImpl
push消费具体处理逻辑的对象,包括调用再平衡,请求消息,处理消息等等
OffsetStore
集群模式消费的位移存在RemoteOffsetStore中,在拉取消息的时候会先从服务端同步位移,同步不到位移的时候才会从头开始消费
rocketmq源码梳理之push模式消费者的建立