rocketmq源码梳理之服务端的处理
MqClientInstance维护了与服务端的连接,并且生产者和消费者都通过它与服务端进行交互,那么客户端请求的各个功能服务端那边是如何处理的呢?
客户端建立连接代码
NettyRemotingClient类,与nameSrv通过netty建立连接
NettyEncoder负责消息的编码
NettyDecoder负责消息的解码
IdleStateHandler负责空闲连接的处理
NettyConnectManageHandler负责连接事件的回调处理
NettyClientHandler负责对响应消息的处理
服务端监听连接代码
NettyRemotingServer类,监听连接
NettyEncoder负责消息的编码
NettyDecoder负责消息的解码
IdleStateHandler负责空闲连接的处理
NettyConnectManageHandler负责连接事件的回调处理
NettyServerHandler负责对响应消息的处理
一、从namesrv获取topic的路由信息
客户端有一个定时器定期调用,由pollNameServerInterval控制调用周期,默认为30s,调用代码如下
调用对象GetRouteInfoRequestHeader,只有一个成员变量topic
调用编码GET_ROUTEINFO_BY_TOPIC,值为105
客户端通过NettyEncoder生成报文并发送请求到namesrv,namesrv的NettyDecoder收到报文并解析,解析成功后由NettyServerHandler进行处理
此处收到的请求报文,所以执行processRequestCommand方法
由于namesrv只注册了defaultRequestProcessor,所以此处由defaultRequestProcessor进行处理,并且新建了一个Runnable对象,异步执行请求
并且defaultRequestProcessor继承了AsyncNettyRequestProcessor类,所以执行这个分支代码.
并且defaultRequestProcessor的isRejectRquest方法固定返回false,所以不会进行流控
然后将处理任务提交到线程池中进行处理,处理逻辑执行defaultRequestProcessor的processRequest方法
此处根据请求的编码走具体的处理分支,此处调用getRouteInfoByTopic方法
TopicRouteData对象里有四个成员变量
orderTopicConf (暂不知道什么作用)
queueDatas topic的所有队列信息
brokerDatas topic队列所属的所有broker信息
filterServerTable (暂不知道有啥用)
根据topic从topicQueueTable中获取topic下的所有队列,然后遍历队列信息,获取到topic所在的brokerName,然后通过brokerName从brokerAddrTable中获取broker的信息.最后再遍历broker,通过brokerAddr从filterServerTable中获取filterServer的信息
从kvConfig中获取orderTopicConf的信息
这几个作用后面再去梳理
获取到信息后写回到客户端,此时是Response消息,所以客户端执行processResponseCommand方法
根据请求的序列号获取到ResponseFuture对象,设置响应码,然后如果有回调函数则调用回调方法,没有就取消阻塞
此处getRouteInfoByTopic是同步调用,调用完后调用waitResponse方法阻塞等待返回,要么等待超时时间后抛出异常,要么等待响应结果回来调用putResponse唤醒
最后获取到topicRouteData信息后,与本地之前的数据比较判断是否有更改,如果队列信息或者Broker信息有更改,则会去更新每一个生产者和消费者所持有的队列和broker信息
二、定期发送心跳到namesrv
sendHeartbeatToAllBroker
客户端有一个定时器定期调用,由heartbeatBrokerInterval控制调用周期,默认为30s,调用代码如下
首先调用prepareHeartbeatData方法生成心跳的信息数据HeartbeatData
心跳信息包括了当前客户端的id、客户端所有的消费者信息、客户端所有的生产者信息
如果当前客户端没有任何的消费者和生产者,则直接返回,不用发送心跳
遍历下当前客户端交互的所有broker地址,进行遍历,如果当前客户端无消费者,且broker不是master(brokerId为0的是master),那也不用发送心跳,因为只能往master发送消息,slave只能消费消息
然后调用sendHearbeat方法将心跳信息发送给broker,此处也是同步调用,超时时间取的mqClientApiTimeout,默认为3秒
requestCode为HEART_BEAT,值为34
然后通过传入的brokerAddr,与broker建立连接
这个方法可以看出,当addr为空时,就获取或者创建与namesrv的连接,而addr有值时,就获取或者建立与addr的连接
然后就是broker端收到请求后的处理,borker端由ClientManageProcessor类负责心跳请求的处理
执行hearbeat方法
首先根据心跳信息传过来的客户端消费者信息,判断当前broker是否有订阅组配置对象,如果有,则会调用createTopicInSendMessageBackMethod方法,以%retry%加上消费者组名,新建一个topicConfig对象(不知道创建topic干嘛),如果之前已经创建过则不做处理直接返回,如果没有创建过,则将TopicConfig对象放入topicConfigTable里,并且调用registerBrokerAll通知给所有broker,此处forceRegister默认传的true,代表一定会通知所有broker,如果该值是false的话,方法内部还会调用needRegister方法,请求Namesrv,请求编码是QUERY_DATA_VERSION,值是322(判断是否change,如果change也会去通知所有broker,此处如何判断的change,后续再看)
注册就是将当前broker信息封装成RegisterBrokerRequestHeader对象,请求编码是REGISTER_BROKER,值是103,然后去请求所有的Namesrv,将信息传过去,此处需要所有Namesrv全都请求结束才会接着往下执行
然后接着就是将消费者注册到当前broker
首先判断consumerTable中是否已经存在了,没有就放入.然后基于连接信息判断是否是最新的连接,判断订阅信息是否改变.如果连接是最新的(新加入了消费者)或者订阅信息改变了(新的订阅信息),都代表了消费者有变化了,如果isNotifyConsumerIdsChangedEnable为true的话(默认为true),则会去通知该消费者组的所有消费端去再平衡,后面再调用register方法将订阅信息注册到groupFilterData(这个map不知道干嘛的)
通知客户端再平衡请求编码是NOTIFY_CONSUMER_IDS_CHANGED,值为40,请求体为NotifyConsumerIdsChangedRequestHeader,带有消费者组信息
然后继续看heartBeat方法,后面就是将生产者的连接注册到groupChannelTable中,心跳执行结束,并返回成功给客户端
三、发送消息给broker端
客户端请求码SEND_MESSAGE,值为10
broker端由SendMessageProcessor进行处理
rocketmq源码梳理之服务端的处理