rocketmq源码梳理之producer生产者
rocketmq producer生产者的建立过程以及消息发送流程
示例代码
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
for (int i = 0; i < 128; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
代码的执行流程
生产者对象的初始化
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
进入DefaultMqProducer类,发现类的成员变量在初始化的时候设置了不少默认值,整理如下:
变量名 | 变量类型 | 变量含义 | 默认值 |
---|---|---|---|
retryResponseCodes | Set | 接受到set里包含的响应码后会进行重试 | ResponseCode.TOPIC_NOT_EXIST, ResponseCode.SERVICE_NOT_AVAILABLE, ResponseCode.SYSTEM_ERROR, ResponseCode.NO_PERMISSION, ResponseCode.NO_BUYER_ID, ResponseCode.NOT_IN_CURRENT_UNIT |
createTopicKey | String | 自动会创建的topic名称 | TBW102 |
defaultTopicQueueNums | int | topic默认的队列数 | 4 |
sendMsgTimeout | long | 发送消息的超时时间,默认单位毫秒 | 3000 |
compressMsgBodyOverHowmuch | long | 超过该值配置大小,消息会进行压缩,单位byte | 4096 |
retryTimesWhenSendFailed | int | 发送失败后重试的次数 | 2 |
retryTimesWhenSendAsyncFailed | int | 异步发送失败后重试的次数 | 2 |
retryAnotherBrokerWhenNotStoreOK | boolean | 发送失败后是否切换到另一个节点发送 | false |
maxMessageSize | long | 消息最大大小,单位byte | 1024 * 1024 * 4 |
该类继承了ClientConfig类,该类初始化时也初始化了部分默认配置.并且在最终的构造方法中初始化了DefaultMQProducerImpl类,该类具体实现了生产者的一些逻辑
生产者的启动
执行的DefaultMQProducerImpl类的start方法
先调用checkConfig方法检查必要配置是否配置
再判断当前的消费者组名是否是CLIENT_INNER_PRODUCER,如果不是则更改当前生产者的instanceName,改为用pid+当前纳秒为名
根据instanceName获取MqClientInstance对象,如果获取不到,则新建
将当前生产者对象放入MqClientInstance的producerTable中,如果放入失败,则代表已经有重名的生产者组初始化过了,抛出错误,否则注册成功
然后启动MqClientInstance对象,初始化完毕
MqClientInstance对象是与服务端交互的主要对象,初始化流程与push消费者一致
生产者发送消息
初始化生产者消息,指定了topic,tag,keys以及消息体
其中topic指的是消息的主题,tags指定的消息用于筛选的字段,keys会根据相应规则将消息分配到不同的队列上
然后调用producer.send(msg)将消息发送到服务端
此处获取了发送消息的超时时间,如果未设定,默认是3秒
send发送为同步发送,即发送时会阻塞等待发送结果
最终调用DefaultMQProducerImpl的sendDefaultImpl方法执行具体发送逻辑
检查生产者状态,如果不在运行状态直接抛错
检查消息是否合规
获取当前时间作为消息的开始发送时间
获取当前topic的发布信息,封装成了TopicPublishInfo对象,该对象可以获取topic的队列信息以及队列处于哪个broker下
获取重试次数,如果是同步发送,则重试次数为1+retryTimesWhenSendFailed,如果不是同步发送,则为1次
根据规则获取一个队列.默认规则是TopicPublishInfo里有个累加器,每次按顺序从messageQueueList中获取一个MessageQueue用于发送,如果之前发送失败,重试重新获取的,则按照累加器的值继续往下获取,直到获取到一个不是之前队列的进行发送
判断当前时间与上一次开始发送时间的差值是否超过了超时时间,超过了则直接跳出循环
然后调用sendKernelImpl方法发送消息
将当前时间作为发送完成时间
更新延迟时间到FaultItem对象(暂不知道干啥用)
根据通信方式,当前是同步,判断响应结果是否发送成功,如果不是且配置发送失败重试的话,则继续循环,如果发送成功,则直接返回发送结果.
接下来详细看下sendKernelImpl的方法
根据brokerName获取broker的地址,如果地址获取不到,则从服务端再根据topic拉取一次topic的发布信息,然后再获取一次brokerAddr
如果brokerAddr不为null,判断是否使用虚拟通道发送消息,如果是,则将broker的端口号-2,如果不是,则对brokerAddr不做改变(暂不知道有啥用途)
如果当前消息不是MessageBatch,则为消息生成一个唯一id
判断消息是否超过配置的compressMsgBodyOverHowmuch大小,如果超过,则对消息体进行压缩,并修改对应标志位
判断是否是事务消息,如果是,修改对应标志位
调用checkForbiddenHook回调
调用sendMessageHook的sendMessageBefor回调
封装SendMessageRequestHeader对象发送给服务端
生成请求报文对象
在发送前再判断是否已经超过超时时间
rocketmq源码梳理之producer生产者