更多RocketMQ内容,见专栏:https://blog.csdn.net/saintmm/category_11280399.html
消息轨迹简单来说就是日志,其把消息的生产、存储、消费等所有的访问和操作日志。
在项目中存在发送方与消费方相互“扯皮”的情况:
1> 修改Broker服务端配置,设置 traceTopicEnable=true;
表示在Broker上创建名为RMQ_SYS_TRACE_TOPIC的topic,队
列个数为1。所有的msgTrace信息默认都存储在这个topic中。
2> Producer中开启消息轨迹;
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace)
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic)
3> Consuemr中开启消息轨迹;
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace)
如果启用了消息轨迹,在消息发送时尽量为消息指定Key属性,以便在RocketMQ-Console中对消息进行高性能的查询。
1> broker的配置文件(broker.conf)中增加如下配置,然后重启Broker:
traceTopicEnable = true
2> Producer:
public class TraceProducer { public static void main(String[] args) throws Exception { // 第二个参数TRUE,表示开启MsgTrace DefaultMQProducer producer = new DefaultMQProducer("saint-test", true); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setMaxMessageSize(1024 * 1024 * 10); producer.start(); Message msg = new Message("test-topic-trace",null, "key-trace", "trace-2".getBytes(StandardCharsets.UTF_8)); SendResult send = producer.send(msg); System.out.println("sendResult: " + send); // 关闭生产者 producer.shutdown(); System.out.println("已经停机"); } }
3> Consumer:
public class TraceConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("study-consumer", true); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.subscribe("test-topic-trace", "*"); consumer.setConsumeTimeout(20L); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setMessageModel(MessageModel.BROADCASTING); consumer.registerMessageListener((MessageListenerConcurrently) (msgs, consumeConcurrentlyContext) -> { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); System.out.println("Consumer start。。。。。。"); } }
消息轨迹内容包括:消息ID(MessageID)、消息Tag、消息Key(MessageKey)、消息的存储时间、处理消息的客户端IP、存储服务器IP、发送/消费耗时、消息轨迹状态、跟踪类型。
在RocketMQ-Console中的消息轨迹内容如下:
在MessageTrace大分类下有两种方式可以查看消息轨迹,一种是根据 原消息Topic + MessageKey、另一种是根据 原消息Topic + MessageID;
所以建议如果启用了消息轨迹,在消息发送时尽量为消息指定Key属性,以便在RocketMQ-Console中对消息进行高性能的查询。
1> 根据Message Key查询:
2> 根据Message ID查询:
1> 消息轨迹主体内容采用TraceContext类存储;
public class TraceContext implements Comparable{ private TraceType traceType; private long timeStamp = System.currentTimeMillis(); private String regionId = ""; private String regionName = ""; private String groupName = ""; private int costTime = 0; private boolean isSuccess = true; private String requestId = MessageClientIDSetter.createUniqID(); private int contextCode = 0; private List traceBeans;
traceType:跟踪类型,可选值为Pub(消息发送)、SubBefore(消息拉取到客户端,在执行业务定义的消费逻辑之前)、SubAfter(消费后)。
timeStamp:当前时间戳。
regionId:Broker所在的区域ID,取自BrokerConfig#regionId()。
groupName:组名称,traceType为Pub时表示生产者组的名称,traceType为subBefore或subAfter时表示消费组名称。
requestId:在traceType为subBefore、subAfter时使用,消
费端的请求ID。
contextCode:消费状态码,可选值为SUCCESS、TIME_OUT、EXCEPTION、RETURNNULL、FAILED。
2> 针对消息信息采用TraceBean类维护;
public class TraceBean { private static final String LOCAL_ADDRESS = UtilAll.ipToIPv4Str(UtilAll.getIP()); private String topic = ""; private String msgId = ""; private String offsetMsgId = ""; private String tags = ""; private String keys = ""; private String storeHost = LOCAL_ADDRESS; private String clientHost = LOCAL_ADDRESS; private long storeTime; private int retryTimes; private int bodyLength; private MessageType msgType; }
- topic:消息主题
- msgId:消息唯一ID
- offsetMsgId:消息偏移量ID,该ID中包含了Broker的IP以及偏移量
- tags:消息标志
- keys:消息索引key,根据该key可快速检索消息
- storeHost:跟踪类型为Pub时存储该消息的Broker服务器IP,跟踪类型为subBefore、subAfter时存储消费者IP
- bodyLength:消息体的长度
- msgType:消息的类型,可选值为Normal_Msg(普通消息)、Trans_Msg_Half(预提交消息)、Trans_msg_Commit(提交消息)、Delay_Msg(延迟消息)
RocketMQ选择将消息轨迹数据当作一条消息,存储在Broker服务器中。
RocketMQ提供了两种方式来定义消息轨迹存储的topic:
另外:为了避免消息轨迹的数据与正常的业务数据混在一起,官方建议单独使用一个Broker用于开启消息轨迹跟踪。消息轨迹数据只会发送到这一台Broker服务器上,不影响原业务Broker的负载压力。
MQ的核心操作是消息发送和消息存储,数据载体为消息。消息轨迹主要是记录消息何时发送到哪台Broker?发送耗时是多少?在什么时候被哪个消费者消费?消费耗时是多少?…
在消息发送前后RocketMQ会将本次调用的信息进行采集。RocketMQ通过提供消息发送钩子函数(SendMessageHook)实现,并且为了不明显增加消息发送的时间延迟,使用异步的方式记录消息轨迹。
在案例中我们知道在实例化DefaultMQProducer时,需要将入参enableMsgTrace设置为true,才能开启消息轨迹。当enableMsgTrace为true时,看DefaultMQProducer的构造函数:
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) { this.namespace = namespace; this.producerGroup = producerGroup; defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); //if client open the message trace feature if (enableMsgTrace) { try { AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook); dispatcher.setHostProducer(this.getDefaultMQProducerImpl()); traceDispatcher = dispatcher; this.getDefaultMQProducerImpl().registerSendMessageHook( new SendMessageTraceHookImpl(traceDispatcher)); } catch (Throwable e) { log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); } } }
其中:
在DefaultMqProducerImple#sendKernelImpl方法中,会首先判断是否有发送消息钩子函数(SendMessageHook);如果有:
无论是DefaultMQProducerImpl中的executeSendMessageHookBefore()方法还是executeSendMessageHookAfter()方法,内部都是调用所有SendMessageHook的相应before()、after()方法。
public void executeSendMessageHookBefore(final SendMessageContext context) { if (!this.sendMessageHookList.isEmpty()) { for (SendMessageHook hook : this.sendMessageHookList) { try { hook.sendMessageBefore(context); } catch (Throwable e) { log.warn("failed to executeSendMessageHookBefore", e); } } } } public void executeSendMessageHookAfter(final SendMessageContext context) { if (!this.sendMessageHookList.isEmpty()) { for (SendMessageHook hook : this.sendMessageHookList) { try { hook.sendMessageAfter(context); } catch (Throwable e) { log.warn("failed to executeSendMessageHookAfter", e); } } } }
@Override public void sendMessageBefore(SendMessageContext context) { //if it is message trace data,then it doesn't recorded if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) { return; } //build the context content of TuxeTraceContext TraceContext tuxeContext = new TraceContext(); tuxeContext.setTraceBeans(new ArrayList(1)); context.setMqTraceContext(tuxeContext); tuxeContext.setTraceType(TraceType.Pub); tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup())); //build the data bean object of message trace TraceBean traceBean = new TraceBean(); traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic())); traceBean.setTags(context.getMessage().getTags()); traceBean.setKeys(context.getMessage().getKeys()); traceBean.setStoreHost(context.getBrokerAddr()); traceBean.setBodyLength(context.getMessage().getBody().length); traceBean.setMsgType(context.getMsgType()); traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostProducer().getmQClientFactory().getClientId()); tuxeContext.getTraceBeans().add(traceBean); }
方法逻辑很简单:只是在消息发送之前先收集消息的topic、tag、key、存储Broker的IP地址、消息体的长度等基础信息,并将消息轨迹数据先存储在调用上下文中。
@Override public void sendMessageAfter(SendMessageContext context) { //if it is message trace data,then it doesn't recorded if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName()) || context.getMqTraceContext() == null) { return; } if (context.getSendResult() == null) { return; } if (context.getSendResult().getRegionId() == null || !context.getSendResult().isTraceOn()) { // if switch is false,skip it return; } TraceContext tuxeContext = (TraceContext) context.getMqTraceContext(); TraceBean traceBean = tuxeContext.getTraceBeans().get(0); int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size()); tuxeContext.setCostTime(costTime); if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) { tuxeContext.setSuccess(true); } else { tuxeContext.setSuccess(false); } tuxeContext.setRegionId(context.getSendResult().getRegionId()); traceBean.setMsgId(context.getSendResult().getMsgId()); traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId()); traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2); localDispatcher.append(tuxeContext); }
消息发送到Broker返回之后会调用到sendMessageAfter()方法。
进行一些校验:
- 如果消息发送上下为空 或 发送消息结果为空,则直接返回,不记录消息轨迹数据。
进一步维护消息轨迹数据;
从MqTraceContext中获取跟踪的TraceBean,虽然TraceContext中将TraceBean设计成List结构,但在消息发送时,这里的数据永远只有一条,即使是批量发送。
设置costTime(消息发送耗时)、success(是否发送成功)、regionId(发送到Broker所在的分区)、msgId(消息ID,全局唯一)、offsetMsgId(消息物理偏移量,如果是批量消息,则是最后一条消息的物理偏移量)、storeTime。
注意:storeTime并不是真实的消息存储时间,而是一个估算值,取自:客户端发送消息耗时的一半。
traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);关键点:消息轨迹的异步发送;
将消息轨迹添加到AsyncTraceDispatcher中的阻塞队列traceContextQueue中,以供后续异步发送消息轨迹使用;
在实例化Producer时,如果开启了消息轨迹,会实例化AsyncTraceDispatcher;并且在启动Producer时也会启动AsyncTraceDispatcher;
- 最终目的是从阻塞队列traceContextQueue中找到待异步发送的轨迹消息,然后发送到相应Broker中。
进一步看AsyncTraceDispatcher#start()方法:
public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException { if (isStarted.compareAndSet(false, true)) { traceProducer.setNamesrvAddr(nameSrvAddr); traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr); traceProducer.start(); } this.accessChannel = accessChannel; this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId); this.worker.setDaemon(true); this.worker.start(); this.registerShutDownHook(); }
其中启动一个后台线程,线程的具体逻辑体现在AsyncRunnable中。
class AsyncRunnable implements Runnable { private boolean stopped; @Override public void run() { while (!stopped) { Listcontexts = new ArrayList (batchSize); for (int i = 0; i < batchSize; i++) { TraceContext context = null; try { //get trace data element from blocking Queue — traceContextQueue context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } if (context != null) { contexts.add(context); } else { break; } } if (contexts.size() > 0) { AsyncAppenderRequest request = new AsyncAppenderRequest(contexts); traceExecutor.submit(request); } else if (AsyncTraceDispatcher.this.stopped) { this.stopped = true; } } } }
class AsyncAppenderRequest implements Runnable { ListcontextList; public AsyncAppenderRequest(final List contextList) { if (contextList != null) { this.contextList = contextList; } else { this.contextList = new ArrayList (1); } } @Override public void run() { sendTraceData(contextList); } public void sendTraceData(List contextList) { Map > transBeanMap = new HashMap >(); for (TraceContext context : contextList) { if (context.getTraceBeans().isEmpty()) { continue; } // Topic value corresponding to original message entity content String topic = context.getTraceBeans().get(0).getTopic(); String regionId = context.getRegionId(); // Use original message entity's topic as key String key = topic; if (!StringUtils.isBlank(regionId)) { key = key + TraceConstants.CONTENT_SPLITOR + regionId; } List transBeanList = transBeanMap.get(key); if (transBeanList == null) { transBeanList = new ArrayList (); transBeanMap.put(key, transBeanList); } TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context); transBeanList.add(traceData); } for (Map.Entry > entry : transBeanMap.entrySet()) { String[] key = entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR)); String dataTopic = entry.getKey(); String regionId = null; if (key.length > 1) { dataTopic = key[0]; regionId = key[1]; } flushData(entry.getValue(), dataTopic, regionId); } }
同样,在消息消费前后RocketMQ会将本次调用的信息进行采集。RocketMQ通过提供消息消费钩子函数(ConsumeMessageHook)实现,并且为了不明显增加消息消费的时间延迟,使用异步的方式记录消息轨迹。
消息消费和消息发送是一样的机制,这里就不冗余介绍了,贴几个代码截图也就一目了然了。
实例化消费者
启动消费者
以并行消费消息为例:
public void executeHookBefore(final ConsumeMessageContext context) { if (!this.consumeMessageHookList.isEmpty()) { for (ConsumeMessageHook hook : this.consumeMessageHookList) { try { hook.consumeMessageBefore(context); } catch (Throwable e) { } } } } public void executeHookAfter(final ConsumeMessageContext context) { if (!this.consumeMessageHookList.isEmpty()) { for (ConsumeMessageHook hook : this.consumeMessageHookList) { try { hook.consumeMessageAfter(context); } catch (Throwable e) { } } } }
消息轨迹其实就是记录消息从发送 到 存储 再到 消费,整个消息生命周期中的一些关键信息,比如:谁发送的、发送耗时多久、消息保存在哪了、谁消费了、消费耗时多久。
在RocketMQ中的实现方式也很简单,在消息发送/消费前后基于钩子函数,做before()、after()逻辑,进而记录消息轨迹信息。
特别注意:storeTime并不是真实的消息存储时间,而是一个估算值,取自:客户端发送消息耗时的一半。
消息轨迹功能涉及到的关键类: