细节一,消息发送前,需要获得topic的发布配置信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());//获得消息发布的topic配置信息
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { //先从本地内存数据结构中获取,系统启动后,新的topic肯定是空 TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); //如果topic为空或者topic不可用 if (null == topicPublishInfo || !topicPublishInfo.ok()) { //首先将topic的数据结果放置到当前的内存数据结构中 this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); //根据topic名称更新topic的路由信息,底层是基于netty的通信,请求nameserv服务 //如果有数据变化,下层服务会调用当前服务的更新操作 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); //获得罪行的topic的发布配置 topicPublishInfo = this.topicPublishInfoTable.get(topic); } //如果topic的发布配置正常可用则返回 if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { //再次请求更新topic的操作,该次的更新具备默认的操作配置 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); //再次获得topic的配置信息 topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; }}
有一个核心设计,第一次根据topic的直接请求namesrv的操作,此时如果有值的话则会直接返回,如果是系统发布的一个新的topic,此时namesrv上肯定是不存在该配置信息,然后又再次请求namesrv进行通信,只是这次的请求是基于默认的topic来请求的,同时将返回的queue的配置获得最小值,设置到topic对应的queue中。
我们主要看更新topic的操作,其他的操作都是基于异常机制的处理
//根据topic去namesrv拉取信息,同时更新到当前的内存中public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) { try { //获得执行锁 if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { TopicRouteData topicRouteData; //如果采用默认操作,并且默认的配置不为空 if (isDefault && defaultMQProducer != null) { //通过netty的client拉取共享主题配置的路由 topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3); //获取的结果不为空 if (topicRouteData != null) { for (QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); data.setReadQueueNums(queueNums); data.setWriteQueueNums(queueNums); } } } else { //实时从namesrv中拉取topic的路由信息 topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3); } if (topicRouteData != null) { //获得历史的路由信息 TopicRouteData old = this.topicRouteTable.get(topic); //判断历史和当前的路由是否有变化 boolean changed = topicRouteDataIsChange(old, topicRouteData); if (!changed) { changed = this.isNeedUpdateTopicRouteInfo(topic); } else { log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData); } if (changed) { //copy一份topic的路由信息 TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData(); //将路由信息根据配置放置到当前的内部缓存中 for (BrokerData bd : topicRouteData.getBrokerDatas()) { this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs()); } // Update Pub info //将最新的结果更新到当前服务内存中所有的发布者配置中 { TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData); publishInfo.setHaveTopicRouterInfo(true); Iterator> it = this.producerTable.entrySet().iterator(); while (it.hasNext()) { Entry entry = it.next(); MQProducerInner impl = entry.getValue(); if (impl != null) { //该操作就是更新topic的发布配置信息 impl.updateTopicPublishInfo(topic, publishInfo); } } } // Update sub info { Set subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData); Iterator > it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry entry = it.next(); MQConsumerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicSubscribeInfo(topic, subscribeInfo); } } } log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData); //将topic的路由信息放置到当前结构的内存数据结构中 this.topicRouteTable.put(topic, cloneTopicRouteData); return true; } } else { log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic); } } catch (Exception e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) { log.warn("updateTopicRouteInfoFromNameServer Exception", e); } } finally { this.lockNamesrv.unlock(); } } else { log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS); } } catch (InterruptedException e) { log.warn("updateTopicRouteInfoFromNameServer Exception", e); } return false;}
细节二,选择queue来进行消息的发送this.selectOneMessageQueue(topicPublishInfo, lastBrokerName)根据topic选择queue
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);}
此处有个设计就是管理器,MQFaultStrategy,具备一定策略的操作
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);}
public class MQFaultStrategy { private final static InternalLogger log = ClientLogger.getLog(); //延迟队列的容错,存储的内容是延迟的队列信息,主要是broker private final LatencyFaultTolerancelatencyFaultTolerance = new LatencyFaultToleranceImpl(); //是否开启 private boolean sendLatencyFaultEnable = false; //延迟级别 private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; //不可用的延迟 private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; public long[] getNotAvailableDuration() { return notAvailableDuration; } public void setNotAvailableDuration(final long[] notAvailableDuration) { this.notAvailableDuration = notAvailableDuration; } public long[] getLatencyMax() { return latencyMax; } public void setLatencyMax(final long[] latencyMax) { this.latencyMax = latencyMax; } public boolean isSendLatencyFaultEnable() { return sendLatencyFaultEnable; } public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) { this.sendLatencyFaultEnable = sendLatencyFaultEnable; } //选择消息队列 public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { //是否开启 if (this.sendLatencyFaultEnable) { try { //获得索引 int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; //获得mq MessageQueue mq = tpInfo.getMessageQueueList().get(pos); //是否是可用的broker if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } //获得一个不是最优的broker final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); //获得需要些的queue int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { //选择一个queue final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { //删除延迟broker latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } //直接调用topic的选择队列方法 return tpInfo.selectOneMessageQueue(lastBrokerName); } public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { if (this.sendLatencyFaultEnable) { long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } } private long computeNotAvailableDuration(final long currentLatency) { for (int i = latencyMax.length - 1; i >= 0; i--) { if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i]; } return 0; }}
public class LatencyFaultToleranceImpl implements LatencyFaultTolerance{ //延迟容错的集合 private final ConcurrentHashMap faultItemTable = new ConcurrentHashMap (16); private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex(); @Override public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { FaultItem old = this.faultItemTable.get(name); if (null == old) { final FaultItem faultItem = new FaultItem(name); faultItem.setCurrentLatency(currentLatency); faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); old = this.faultItemTable.putIfAbsent(name, faultItem); if (old != null) { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } else { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } @Override public boolean isAvailable(final String name) { final FaultItem faultItem = this.faultItemTable.get(name); if (faultItem != null) { return faultItem.isAvailable(); } return true; } @Override public void remove(final String name) { this.faultItemTable.remove(name); } @Override public String pickOneAtLeast() { final Enumeration elements = this.faultItemTable.elements(); List tmpList = new LinkedList (); while (elements.hasMoreElements()) { final FaultItem faultItem = elements.nextElement(); tmpList.add(faultItem); } if (!tmpList.isEmpty()) { Collections.shuffle(tmpList); Collections.sort(tmpList); final int half = tmpList.size() / 2; if (half <= 0) { return tmpList.get(0).getName(); } else { final int i = this.whichItemWorst.getAndIncrement() % half; return tmpList.get(i).getName(); } } return null; }
class FaultItem implements Comparable{ private final String name; private volatile long currentLatency; private volatile long startTimestamp; public FaultItem(final String name) { this.name = name; } @Override public int compareTo(final FaultItem other) { if (this.isAvailable() != other.isAvailable()) { if (this.isAvailable()) return -1; if (other.isAvailable()) return 1; } if (this.currentLatency < other.currentLatency) return -1; else if (this.currentLatency > other.currentLatency) { return 1; } if (this.startTimestamp < other.startTimestamp) return -1; else if (this.startTimestamp > other.startTimestamp) { return 1; } return 0; } public boolean isAvailable() { return (System.currentTimeMillis() - startTimestamp) >= 0; }
最终的操作是提现在消息发送后的操作,发送成功后,更新对应的条目
//时间的控制beginTimestampPrev = System.currentTimeMillis();long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) { callTimeout = true; break;}//调用发送的实际操作sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();//更新操作条目,主要是是否是容错延迟操作this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);