rocketmq源码梳理之push模式消费者的建立

rocketmq push消费模式消费者的建立过程

示例代码

    //使用消费者组名初始化push模式消费者对象 消费者组名要唯一
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

    //设置消费起始位置 这里指定的是从头开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

    //设置订阅的topic 以及tag
    consumer.subscribe("TopicTest", "*");

    //设置消费消息后的回调函数 这里使用的不是有序消费
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

    //启动消费者消费
    consumer.start();
    
    

代码的执行流程

1. 消费者对象的初始化
//使用消费者组名初始化push模式消费者对象 消费者组名要唯一
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

进入DefaultMqPushConsumer对象,可以看到声明了很多成员变量并初始化了默认值.整理如下:

变量名 变量类型 变量含义 默认值
messageModel MessageModel 消费消息的模式,有两种配置,集群和广播 集群,MessageModel.CLUSTERING
consumeFromWhere ConsumeFromWhere 从何处开始消费,配置值有从最开始、从最新、从指定时间戳 从最新处,ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
consumeTimestamp String 消费时间戳,yyyyMMddHHmmss格式 当前时间往前推30分钟
consumeThreadMin int 消费线程最小值 20
consumeThreadMax int 消费线程最大值 20
adjustThreadPoolNumsThreshold int 动态调整线程池数量的阈值 10000
consumeConcurrentlyMaxSpan int 并发消费最大数据量 2000
pullThresholdForQueue int 队列缓存的最大消息量 1000
pullThresholdSizeForQueue int 队列拉取的消息的大小阈值,单位为MB 100
pullThresholdForTopic int topic一次性拉取的最大消息量,例如改值设置1000时,而有10个队列的话,则每个队列最大只能拉取100个,相当于pullThresholdForQueue 设置了100 -1,不限制
pullInterval long 拉取间隔 0,拉取后不等待
consumeMessageBatchMaxSize int 消费的批量大小 1
pullBatchSize int 批量拉取大小 32
postSubscriptionWhenPull boolean 拉取的时候是否提交订阅信息 false
unitMode boolean 单元模式 false
maxReconsumeTimes int 最大重试次数,有序消费时,-1代表无穷大,无序消费时,-1代表16次 -1
suspendCurrentQueueTimeMillis long —- 1000
consumeTimeout long 消费超时时间,单位分钟 15
awaitTerminationMillisWhenShutdown long 停止时暂停等消费时间,0就是不等待 0

同时DefaultMqPushConsumer对象继承了ClientConfig对象,该对象也模式初始化了一些配置,整理如下

变量名 变量类型 变量含义 默认值
namesrvAddr String 元数据服务器地址 从系统参数rocketmq.namesrv.addr中获取,获取不到就从环境变量NAMESRV_ADDR中获取
clientIP String 客户端ip 通过RemotingUtil.getLocalAddress()获取
instanceName String 实例名称 从系统参数rocketmq.client.name中获取,获取不到就使用默认名称DEFAULT
clientCallbackExecutorThreads int 客户端回调执行的线程数 通过Runtime.getRuntime().availableProcessors()获取
accessChannel AccessChannel 访问通道,可选值有LOCAL和CLOUD AccessChannel.LOCAL
pollNameServerInterval long 从元数据服务器拉取信息间隔 30000,单位毫秒
heartbeatBrokerInterval long 心跳间隔时间 30000,单位毫秒
persistConsumerOffsetInterval long 消费位移持久化间隔 5000,单位毫秒
pullTimeDelayMillsWhenException long 出现异常后拉取延迟时间 1000,单位毫秒
unitMode boolean 单元模式 false
vipChannelEnabled boolean 是否开启vip通道 从系统参数com.rocketmq.sendMessageWithVIPChannel获取,获取不到默认值为false
useTLS boolean 是否使用tls安全协议 从系统参数tls.enable获取,获取不到默认值为false
mqClientApiTimeout long 客户端超时时间 3000,单位毫秒
language LanguageCode 代码语言 LanguageCode.JAVA

调用了另一个重载的构造方法,并设置了默认的再平衡策略AllocateMessageQueueAveragely
upload successful

重载的构造方法初始化了DefaultMQPushConsumerImpl对象
overwrote existing file

DefaultMQPushConsumerImpl对象初始化了设置了pullTimeDelayMillsWhenException,该参数为当出现异常时,拉取消息推迟多少秒,默认值从ClientConfig中获取的,为1000毫秒
upload successful
同时该类也默认初始化了不少配置,整理如下:

变量名 变量类型 变量含义 默认值
pullTimeDelayMillsWhenException long 出现异常后拉取延迟时间 3000,单位毫秒,但是在构造函数执行时,默认会重新赋值,改为ClientConfig中的默认值,为1000
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL long 出现流量控制后拉取延迟时间 50,单位毫秒
PULL_TIME_DELAY_MILLS_WHEN_SUSPEND long 当消费端暂停后拉取延迟时间 1000,单位毫秒
BROKER_SUSPEND_MAX_TIME_MILLIS long 节点暂停最大时间 15000,单位毫秒
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND long 当暂停时消费超时时间 30000,单位毫秒
rebalanceImpl RebalanceImpl 再平衡类 RebalancePushImpl,传入了当前对象构造
consumerStartTimestamp long 消费开始时间戳 System.currentTimeMillis(),当前时间
serviceState ServiceState 消费者当前状态,值有CREATE_JUST(创建未启动) 、RUNNING(正在运行)、SHUTDOWN_ALREADY(已经停止)、START_FAILED(启动失败) ServiceState.CREATE_JUST

至此DefaultMQPushConsumer对象初始化完毕,该环节主要是初始化一些配置以及关联对象的一些配置

2. 个性化配置
//设置消费起始位置 这里指定的是从头开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

对于一些配置的设置只是改变了下配置值,如上面的设置起始位置,在具体用到的时候会生效.可设置的配置可以关注下上面梳理的各个对象初始化的配置.

3. 配置订阅信息和回调函数
//设置订阅的topic 以及tag
consumer.subscribe("TopicTest", "*");
//设置消费消息后的回调函数 这里使用的不是有序消费
consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
});

upload successful
继续调用了DefaultMQPushConsumerImpl对象的subscribe方法,另外此处通过调用withNamespace方法对topic名称进行包装,就是给topic名称前再加上命名空间的值,如命名空间为aaa,topic为test,此处就会包装成aaa%topic,通过配置命名空间,可以在不改动代码的情况下,切换mq的消费环境

upload successful
首先使用FilterAPI.buildSubscriptionData(topic, subExpression)生成订阅对象SubscriptionData.该对象保存了订阅的topic,以及配置的过滤表达式,如果指定了tag,将会把tag放入tagSet中,并且将tag的hash值放入codeSet中用以过滤.
将subscriptionData放入rebalanceImpl的subscriptionInner成员变量中,这是一个map,key为topic,值为订阅对象
此时mQClientFactory对象还未初始化,所以此时还不执行

upload successful
然后将消费到消息后的回调函数复制给DefaultMQPushConsumer和DefaultMQPushConsumerImpl对象的messageListener对象中

4. 启动消费者
//启动消费者消费
consumer.start();

upload successful
跟之前的topic一样,给消费者组名加上namespace值
然后执行defaultMQPushConsumerImpl的start方法

upload successful
该方法根据当前服务的状态执行不同的逻辑,由于是刚创建,状态值为CREATE_JUST,所以执行这个分支的逻辑.
首先将服务状态修改为START_FAILED再执行后续逻辑,以防止启动异常后重复启动
checkConfig检查必要的配置以及配置格式是否正确
copySubscription里主要是为当前消费者组增加retry消费者组
当消费模式是集群时,调用changeInstanceNameToPID把默认的实例名改为使用进程id

upload successful
初始化MQClientInstance对象,并以clientId为key,放入factoryTable中.该对象主要用netty与服务端通信

upload successful
初始化了nettyClientConfig对象,主要配置netty连接的相关配置
初始化了ClientRemotingProcessor对象,处理服务端的一些响应逻辑
初始化了MQClientAPIImpl,将clientRemotingProcessor的响应逻辑按照响应编码注册到了netty中
初始化了MQAdminImpl对象
初始化了PullMessageService对象
初始化了RebalanceService对象
初始化了defaultMQProducer对象
初始化了consumerStatsManager对象,可打印状态信息

紧接着主线的初始化
给自平衡对象设置消费者组名,消费模式,再平衡策略,以及通信客户端

紧接着初始化了PullAPIWrapper对象
紧接着初始化了OffsetStore对象,当下消费模式为集群模式,所以使用了RemoteBrokerOffsetStore.

紧接着根据回调函数类型,来设置消费是否有序,当下为无序消费,ConsumeMessageConcurrentlyService,调用该类的start方法,会启动一个定期执行的线程池,用以清理过期消息

以consumerGroup为key,当前消费者对象为值放入MQClientInstance的consumerTable中

然后调用MQClientInstance的start方法

overwrote existing file
首先获取元数据服务器地址
然后调用mqClientApiImpl的start方法与mq服务端建立netty连接
然后启动部分定时任务
然后启动拉取线程
然后启动再平衡线程
然后启动默认的producer线程
然后修改服务状态为Running
至此初始化结束.

后面都是各个线程执行操作.核心就是以下几个线程

RebalanceService

upload successful
定期再平衡
PullMessageService定期拉取数据
fetchNameServerAddr定期获取元数据地址
updateTopicRouteInfoFromNameServer定期获取topic路由
cleanOfflineBroker定期清理下线节点
sendHeartbeatToAllBrokerWithLock定期发送心跳线程
persistAllConsumerOffset持久化所有消费位移
adjustThreadPool定期调整线程池

下面重点介绍这些线程的作用

各线程作用

RebalanceService

upload successful

等待间隔waitInterval,取自系统参数rocketmq.client.rebalance.waitInterval,如果未设置,默认值为20ms

upload successful
如果hasNotified为true,则不等待直接返回
否则等待interval时间再返回

upload successful
遍历consumerTable,执行每一个消费者对象的doRebalance方法,当前为Push模式的消费者,所以执行DefaultMQPushConsumerImpl的doRebalance方法

upload successful
最终调用的RebalanceImpl的doRebalance方法,当前是无序

upload successful
获取订阅信息,根据订阅信息获取topic,然后调用rebalanceByTopic方法,该方法根据消费模式执行不同的再平衡逻辑,当前消费模式是集群

upload successful
获取当前topic的队列信息
获取当前消费者组的各个消费者id
根据再平衡策略将队列分配给各个消费者组,当前默认分配策略是平均分配

upload successful
算法如下:
假如有11个队列,4个消费者,平分的话每个消费者能分两个,余数为3.余数的3个再根据消费者顺序按顺序分配给前3个消费者.这样4个消费者分配方案就是3332

给当前消费者分配好消费队列后,与之前分配的队列进行比较.
如果之前分配的队列不在新的分配队列里了,则将其移除
如果是新添加的队列,则根据消费模式去获取消费的位移,根据配置的消费起始,去计算位移.例如当前配置的是从最开始处获取位移

upload successful
先从本地获取位移,本地有位移以本地为主,本地要是没位移则如果刚开始新建设置为-1,如果已经设置了-1则从0开始消费

如果位移是大于等于0的,则新建一个PullRequest对象以当前位移开始去拉数据.

如果产生了队列的变化,则执行messageQueueChanged方法通知给服务端
根据当前时间戳更新队列的版本号,根据当前队列数量计算pullThresholdForTopic和pullThresholdSizeForTopic
然后通过发送心跳包将订阅信息更新给服务端.
至此再平衡结束

PullMessageService

upload successful

从pullRequestQueue中获取pullRequest对象,然后去拉取消息

upload successful
判断当前ProcessQueue是否还是该消费者消费,如果不是直接返回
判断当前消费对象是否是暂停状态,如果是延迟放入队列直接返回
根据当前ProcessQueue缓存的消息数量和消息大小判断是否超过了配置的大小,超过则进行流控延迟放入队列直接返回
判断当前ProcessQueue的跨度是否超过配置值,超过也进行流控延迟放入队列直接返回
获取订阅信息,如果订阅信息为空延迟放入队列直接返回
初始化回调函数,用于处理消费到的消息,具体消费逻辑后续再说
使用netty客户端拉取消息

upload successful
可以看到将调用信息封装成了一个ResponseFuture对象,并且在netty调用回调里调用成功和失败通过ResponseFuture执行了不同的逻辑.

在netty客户端对象NettyRemotingClient初始化的时候,也有初始化了一个定时线程

upload successful

upload successful
该线程会每秒执行一次,获取ResponseFuture对象,判断消息超时后会同样调用回调逻辑.

此处再看回调函数里的处理

upload successful
当有响应结果才会去调用Onsuccess方法,其他都是调用OnException

upload successful
当OnSuccess时会根据获取的响应结果的状态执行不同的逻辑
当是Found时,即是拉取到了消息,首先获取下次拉取消息的开始位移,留待后续拉取,然后将拉取到的消息调用consumeMessageService.submitConsumeRequest进行处理,目前消费方式不是有序,使用的是ConsumeMessageConcurrentlyService对象,该方法内部会将拉取到的消息封装成ConsumeRequest对象,该对象是个线程,然后内部处理会调用一开始初始化Consumer对象传入的lisener的cousumerMessage方法进行处理
当是NO_NEW_MSG和NO_MATCHED_MSG时,即本次没拉取到消息,则立即再次拉取
当时OFFSET_ILLEGAL时,即位移是不合法的,则移除当前的ProcessQueue,当再平衡时会重新分配,重新分配的时候会重新获取最新位移

当OnException时,会打印对应的异常信息,并延迟放入队列等待重新拉取

fetchNameServerAddr

定期获取NameServer线程
如果NameServer是空,该线程会定期访问配置的接口获取nameserver信息

upload successful
可以做到在运行过程中切换mq环境

updateTopicRouteInfoFromNameServer

定期根据当前消费者所订阅的topic从NameServer中获取元数据信息,包括broker节点信息,队列信息等

cleanOfflineBroker

定期清理下线节点

sendHeartbeatToAllBrokerWithLock

定期与节点发送心跳信息,将当前消费者的信息发送给broker端

persistAllConsumerOffset

定期持久化消费位移到内存offsetStore对象里

adjustThreadPool

定期调整线程池大小

内存dump里需要关注的对象

MqClientInstance

与mq服务端交互的客户端对象,以instanceName确定唯一,每个instanceName会单独建立一个对象

DefaultMQPushConsumerImpl

push消费具体处理逻辑的对象,包括调用再平衡,请求消息,处理消息等等

OffsetStore

集群模式消费的位移存在RemoteOffsetStore中,在拉取消息的时候会先从服务端同步位移,同步不到位移的时候才会从头开始消费

upload successful

upload successful

作者

windwest

发布于

2022-01-06

更新于

2022-01-17

许可协议

评论