博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
rocketmq之源码分析producer的常规操作细节(九)
阅读量:5842 次
发布时间:2019-06-18

本文共 13503 字,大约阅读时间需要 45 分钟。

hot3.png

细节一,消息发送前,需要获得topic的发布配置信息

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());//获得消息发布的topic配置信息
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {    //先从本地内存数据结构中获取,系统启动后,新的topic肯定是空    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);    //如果topic为空或者topic不可用    if (null == topicPublishInfo || !topicPublishInfo.ok()) {        //首先将topic的数据结果放置到当前的内存数据结构中        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());        //根据topic名称更新topic的路由信息,底层是基于netty的通信,请求nameserv服务        //如果有数据变化,下层服务会调用当前服务的更新操作        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);        //获得罪行的topic的发布配置        topicPublishInfo = this.topicPublishInfoTable.get(topic);    }    //如果topic的发布配置正常可用则返回    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {        return topicPublishInfo;    } else {        //再次请求更新topic的操作,该次的更新具备默认的操作配置        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);        //再次获得topic的配置信息        topicPublishInfo = this.topicPublishInfoTable.get(topic);        return topicPublishInfo;    }}

有一个核心设计,第一次根据topic的直接请求namesrv的操作,此时如果有值的话则会直接返回,如果是系统发布的一个新的topic,此时namesrv上肯定是不存在该配置信息,然后又再次请求namesrv进行通信,只是这次的请求是基于默认的topic来请求的,同时将返回的queue的配置获得最小值,设置到topic对应的queue中。

我们主要看更新topic的操作,其他的操作都是基于异常机制的处理

//根据topic去namesrv拉取信息,同时更新到当前的内存中public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,    DefaultMQProducer defaultMQProducer) {    try {        //获得执行锁        if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {            try {                TopicRouteData topicRouteData;                //如果采用默认操作,并且默认的配置不为空                if (isDefault && defaultMQProducer != null) {                    //通过netty的client拉取共享主题配置的路由                    topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),                        1000 * 3);                    //获取的结果不为空                    if (topicRouteData != null) {                        for (QueueData data : topicRouteData.getQueueDatas()) {                            int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());                            data.setReadQueueNums(queueNums);                            data.setWriteQueueNums(queueNums);                        }                    }                } else {                    //实时从namesrv中拉取topic的路由信息                    topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);                }                if (topicRouteData != null) {                    //获得历史的路由信息                    TopicRouteData old = this.topicRouteTable.get(topic);                    //判断历史和当前的路由是否有变化                    boolean changed = topicRouteDataIsChange(old, topicRouteData);                    if (!changed) {                        changed = this.isNeedUpdateTopicRouteInfo(topic);                    } else {                        log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);                    }                    if (changed) {                        //copy一份topic的路由信息                        TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();                        //将路由信息根据配置放置到当前的内部缓存中                        for (BrokerData bd : topicRouteData.getBrokerDatas()) {                            this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());                        }                        // Update Pub info                        //将最新的结果更新到当前服务内存中所有的发布者配置中                        {                            TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);                            publishInfo.setHaveTopicRouterInfo(true);                            Iterator
> it = this.producerTable.entrySet().iterator(); while (it.hasNext()) { Entry
entry = it.next(); MQProducerInner impl = entry.getValue(); if (impl != null) { //该操作就是更新topic的发布配置信息 impl.updateTopicPublishInfo(topic, publishInfo); } } } // Update sub info { Set
subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData); Iterator
> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry
entry = it.next(); MQConsumerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicSubscribeInfo(topic, subscribeInfo); } } } log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData); //将topic的路由信息放置到当前结构的内存数据结构中 this.topicRouteTable.put(topic, cloneTopicRouteData); return true; } } else { log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic); } } catch (Exception e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) { log.warn("updateTopicRouteInfoFromNameServer Exception", e); } } finally { this.lockNamesrv.unlock(); } } else { log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS); } } catch (InterruptedException e) { log.warn("updateTopicRouteInfoFromNameServer Exception", e); } return false;}

细节二,选择queue来进行消息的发送this.selectOneMessageQueue(topicPublishInfo, lastBrokerName)根据topic选择queue

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {    return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);}

此处有个设计就是管理器,MQFaultStrategy,具备一定策略的操作

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {    return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);}
public class MQFaultStrategy {    private final static InternalLogger log = ClientLogger.getLog();    //延迟队列的容错,存储的内容是延迟的队列信息,主要是broker    private final LatencyFaultTolerance
latencyFaultTolerance = new LatencyFaultToleranceImpl(); //是否开启 private boolean sendLatencyFaultEnable = false; //延迟级别 private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; //不可用的延迟 private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; public long[] getNotAvailableDuration() { return notAvailableDuration; } public void setNotAvailableDuration(final long[] notAvailableDuration) { this.notAvailableDuration = notAvailableDuration; } public long[] getLatencyMax() { return latencyMax; } public void setLatencyMax(final long[] latencyMax) { this.latencyMax = latencyMax; } public boolean isSendLatencyFaultEnable() { return sendLatencyFaultEnable; } public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) { this.sendLatencyFaultEnable = sendLatencyFaultEnable; } //选择消息队列 public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { //是否开启 if (this.sendLatencyFaultEnable) { try { //获得索引 int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; //获得mq MessageQueue mq = tpInfo.getMessageQueueList().get(pos); //是否是可用的broker if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } //获得一个不是最优的broker final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); //获得需要些的queue int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { //选择一个queue final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { //删除延迟broker latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } //直接调用topic的选择队列方法 return tpInfo.selectOneMessageQueue(lastBrokerName); } public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { if (this.sendLatencyFaultEnable) { long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } } private long computeNotAvailableDuration(final long currentLatency) { for (int i = latencyMax.length - 1; i >= 0; i--) { if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i]; } return 0; }}
public class LatencyFaultToleranceImpl implements LatencyFaultTolerance
{ //延迟容错的集合 private final ConcurrentHashMap
faultItemTable = new ConcurrentHashMap
(16); private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex(); @Override public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { FaultItem old = this.faultItemTable.get(name); if (null == old) { final FaultItem faultItem = new FaultItem(name); faultItem.setCurrentLatency(currentLatency); faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); old = this.faultItemTable.putIfAbsent(name, faultItem); if (old != null) { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } else { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } @Override public boolean isAvailable(final String name) { final FaultItem faultItem = this.faultItemTable.get(name); if (faultItem != null) { return faultItem.isAvailable(); } return true; } @Override public void remove(final String name) { this.faultItemTable.remove(name); } @Override public String pickOneAtLeast() { final Enumeration
elements = this.faultItemTable.elements(); List
tmpList = new LinkedList
(); while (elements.hasMoreElements()) { final FaultItem faultItem = elements.nextElement(); tmpList.add(faultItem); } if (!tmpList.isEmpty()) { Collections.shuffle(tmpList); Collections.sort(tmpList); final int half = tmpList.size() / 2; if (half <= 0) { return tmpList.get(0).getName(); } else { final int i = this.whichItemWorst.getAndIncrement() % half; return tmpList.get(i).getName(); } } return null; }
class FaultItem implements Comparable
{ private final String name; private volatile long currentLatency; private volatile long startTimestamp; public FaultItem(final String name) { this.name = name; } @Override public int compareTo(final FaultItem other) { if (this.isAvailable() != other.isAvailable()) { if (this.isAvailable()) return -1; if (other.isAvailable()) return 1; } if (this.currentLatency < other.currentLatency) return -1; else if (this.currentLatency > other.currentLatency) { return 1; } if (this.startTimestamp < other.startTimestamp) return -1; else if (this.startTimestamp > other.startTimestamp) { return 1; } return 0; } public boolean isAvailable() { return (System.currentTimeMillis() - startTimestamp) >= 0; }

最终的操作是提现在消息发送后的操作,发送成功后,更新对应的条目

//时间的控制beginTimestampPrev = System.currentTimeMillis();long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {    callTimeout = true;    break;}//调用发送的实际操作sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();//更新操作条目,主要是是否是容错延迟操作this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);

转载于:https://my.oschina.net/wangshuaixin/blog/3056589

你可能感兴趣的文章
rm慎用,使用rm的好习惯
查看>>
CVBS视频信号解析
查看>>
spymemcached源码中Reactor模式分析
查看>>
屏蔽chrome自动填充表单,亲测必行!
查看>>
Linux中的C语言妙用
查看>>
Linux基础资料 linux命令:efax
查看>>
Docker 安装
查看>>
oral_quiz->#N个骰子的点数和#
查看>>
15、文本查看命令--cat、more、head、tail
查看>>
Oracle模糊查询的实现
查看>>
openstack oslo.config简短学习笔记
查看>>
<SH>配置SessionFactory的两种方式
查看>>
Ruby对象模型
查看>>
jfinal最快整合ueditor
查看>>
访问url中存在中文,apache 重写出现403问题处理方案
查看>>
从Redis的数据丢失说起
查看>>
个人总结之Eclipse关联SVN
查看>>
Kafka集群搭建详细步骤
查看>>
Postgres SQL 用法摘记
查看>>
Mac os 10.9 Python MySQLdb
查看>>