rocketmq源码梳理之producer生产者

rocketmq producer生产者的建立过程以及消息发送流程

示例代码

    DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
    producer.start();

    for (int i = 0; i < 128; i++)
        try {
            {
                Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    producer.shutdown();
    

代码的执行流程

生产者对象的初始化

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");  

进入DefaultMqProducer类,发现类的成员变量在初始化的时候设置了不少默认值,整理如下:

变量名 变量类型 变量含义 默认值
retryResponseCodes Set 接受到set里包含的响应码后会进行重试 ResponseCode.TOPIC_NOT_EXIST, ResponseCode.SERVICE_NOT_AVAILABLE, ResponseCode.SYSTEM_ERROR, ResponseCode.NO_PERMISSION, ResponseCode.NO_BUYER_ID, ResponseCode.NOT_IN_CURRENT_UNIT
createTopicKey String 自动会创建的topic名称 TBW102
defaultTopicQueueNums int topic默认的队列数 4
sendMsgTimeout long 发送消息的超时时间,默认单位毫秒 3000
compressMsgBodyOverHowmuch long 超过该值配置大小,消息会进行压缩,单位byte 4096
retryTimesWhenSendFailed int 发送失败后重试的次数 2
retryTimesWhenSendAsyncFailed int 异步发送失败后重试的次数 2
retryAnotherBrokerWhenNotStoreOK boolean 发送失败后是否切换到另一个节点发送 false
maxMessageSize long 消息最大大小,单位byte 1024 * 1024 * 4

该类继承了ClientConfig类,该类初始化时也初始化了部分默认配置.并且在最终的构造方法中初始化了DefaultMQProducerImpl类,该类具体实现了生产者的一些逻辑

生产者的启动

执行的DefaultMQProducerImpl类的start方法

upload successful
先调用checkConfig方法检查必要配置是否配置
再判断当前的消费者组名是否是CLIENT_INNER_PRODUCER,如果不是则更改当前生产者的instanceName,改为用pid+当前纳秒为名
根据instanceName获取MqClientInstance对象,如果获取不到,则新建
将当前生产者对象放入MqClientInstance的producerTable中,如果放入失败,则代表已经有重名的生产者组初始化过了,抛出错误,否则注册成功
然后启动MqClientInstance对象,初始化完毕
MqClientInstance对象是与服务端交互的主要对象,初始化流程与push消费者一致

生产者发送消息

upload successful
初始化生产者消息,指定了topic,tag,keys以及消息体
其中topic指的是消息的主题,tags指定的消息用于筛选的字段,keys会根据相应规则将消息分配到不同的队列上
然后调用producer.send(msg)将消息发送到服务端

upload successful
此处获取了发送消息的超时时间,如果未设定,默认是3秒

upload successful
send发送为同步发送,即发送时会阻塞等待发送结果

upload successful
最终调用DefaultMQProducerImpl的sendDefaultImpl方法执行具体发送逻辑
检查生产者状态,如果不在运行状态直接抛错
检查消息是否合规
获取当前时间作为消息的开始发送时间
获取当前topic的发布信息,封装成了TopicPublishInfo对象,该对象可以获取topic的队列信息以及队列处于哪个broker下
获取重试次数,如果是同步发送,则重试次数为1+retryTimesWhenSendFailed,如果不是同步发送,则为1次
根据规则获取一个队列.默认规则是TopicPublishInfo里有个累加器,每次按顺序从messageQueueList中获取一个MessageQueue用于发送,如果之前发送失败,重试重新获取的,则按照累加器的值继续往下获取,直到获取到一个不是之前队列的进行发送

upload successful
判断当前时间与上一次开始发送时间的差值是否超过了超时时间,超过了则直接跳出循环
然后调用sendKernelImpl方法发送消息
将当前时间作为发送完成时间
更新延迟时间到FaultItem对象(暂不知道干啥用)
根据通信方式,当前是同步,判断响应结果是否发送成功,如果不是且配置发送失败重试的话,则继续循环,如果发送成功,则直接返回发送结果.

接下来详细看下sendKernelImpl的方法

upload successful
根据brokerName获取broker的地址,如果地址获取不到,则从服务端再根据topic拉取一次topic的发布信息,然后再获取一次brokerAddr
如果brokerAddr不为null,判断是否使用虚拟通道发送消息,如果是,则将broker的端口号-2,如果不是,则对brokerAddr不做改变(暂不知道有啥用途)
如果当前消息不是MessageBatch,则为消息生成一个唯一id
判断消息是否超过配置的compressMsgBodyOverHowmuch大小,如果超过,则对消息体进行压缩,并修改对应标志位
判断是否是事务消息,如果是,修改对应标志位
调用checkForbiddenHook回调
调用sendMessageHook的sendMessageBefor回调
封装SendMessageRequestHeader对象发送给服务端

upload successful
生成请求报文对象

upload successful

在发送前再判断是否已经超过超时时间

upload successful

作者

windwest

发布于

2022-01-17

更新于

2022-01-17

许可协议

评论