相关推荐recommended
RocketMQ如何实现消息轨迹:消息何时发送的?耗时多久?谁消费的?存在哪个broker了?
作者:mmseoamin日期:2023-12-18

文章目录

  • 一、前言
  • 二、消息轨迹
    • 1、消息轨迹的引入目的
    • 2、如何使用消息轨迹
      • 1)使用案例
      • 2)消息轨迹内容
      • 3) RocketMQ-Console中查看消息轨迹
      • 3、消息轨迹实现原理
        • 1)消息轨迹数据结构
        • 2)轨迹消息存储
        • 4、如何采集消息轨迹数据
          • 1)消息发送
            • 1> 实例化Producer
            • 2> Producer发送消息
              • <1> sendMessageBefore()
              • <2> sendMessageAfter()
              • <3> 消息轨迹异步发送
              • 2)消息消费
              • 三、总结

                一、前言

                更多RocketMQ内容,见专栏:https://blog.csdn.net/saintmm/category_11280399.html

                二、消息轨迹

                消息轨迹简单来说就是日志,其把消息的生产、存储、消费等所有的访问和操作日志。

                1、消息轨迹的引入目的

                在项目中存在发送方与消费方相互“扯皮”的情况:

                • 发送方说消息已经发送成功,而消费方说没有消费到。
                • 这时我们就希望能记录一条消息的流转轨迹,即:消息是由哪个IP发送的?什么时候发送的?是被哪个消费者消费的?

                  2、如何使用消息轨迹

                  1> 修改Broker服务端配置,设置 traceTopicEnable=true;

                  • 表示在Broker上创建名为RMQ_SYS_TRACE_TOPIC的topic,队

                    列个数为1。所有的msgTrace信息默认都存储在这个topic中。

                    2> Producer中开启消息轨迹;

                    •   public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace)
                      
                      • boolean类型的入参enableMsgTrace设置为true,表示启用消息轨迹追踪,默认为false。
                      •   public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic)
                        
                        • String类型的入参customizedTraceTopic,表示用于记录消息轨迹的topic,不设置默认为RMQ_SYS_TRACE_TOPIC。

                          3> Consuemr中开启消息轨迹;

                          •   public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace)
                            
                            • boolean类型的入参enableMsgTrace设置为true,表示启用消息轨迹追踪,默认为false。

                              如果启用了消息轨迹,在消息发送时尽量为消息指定Key属性,以便在RocketMQ-Console中对消息进行高性能的查询。

                              1)使用案例

                              1> broker的配置文件(broker.conf)中增加如下配置,然后重启Broker:

                              traceTopicEnable = true
                              

                              2> Producer:

                              public class TraceProducer {
                                  public static void main(String[] args) throws Exception {
                                      // 第二个参数TRUE,表示开启MsgTrace
                                      DefaultMQProducer producer = new DefaultMQProducer("saint-test", true);
                                      producer.setNamesrvAddr("127.0.0.1:9876");
                                      producer.setMaxMessageSize(1024 * 1024 * 10);
                                      producer.start();
                                      Message msg = new Message("test-topic-trace",null, "key-trace", "trace-2".getBytes(StandardCharsets.UTF_8));
                                      SendResult send = producer.send(msg);
                                      System.out.println("sendResult: " + send);
                                      // 关闭生产者
                                      producer.shutdown();
                                      System.out.println("已经停机");
                                  }
                              }
                              

                              3> Consumer:

                              public class TraceConsumer {
                                  public static void main(String[] args) throws Exception {
                                      DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("study-consumer", true);
                                      consumer.setNamesrvAddr("127.0.0.1:9876");
                                      consumer.subscribe("test-topic-trace", "*");
                                      consumer.setConsumeTimeout(20L);
                                      consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
                                      consumer.setMessageModel(MessageModel.BROADCASTING);
                                      consumer.registerMessageListener((MessageListenerConcurrently) (msgs, consumeConcurrentlyContext) -> {
                                          for (MessageExt msg : msgs) {
                                              System.out.println(new String(msg.getBody()));
                                          }
                                          return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                                      });
                                      consumer.start();
                                      System.out.println("Consumer start。。。。。。");
                                  }
                              }
                              

                              2)消息轨迹内容

                              消息轨迹内容包括:消息ID(MessageID)、消息Tag、消息Key(MessageKey)、消息的存储时间、处理消息的客户端IP、存储服务器IP、发送/消费耗时、消息轨迹状态、跟踪类型。

                              在RocketMQ-Console中的消息轨迹内容如下:

                              RocketMQ如何实现消息轨迹:消息何时发送的?耗时多久?谁消费的?存在哪个broker了?,在这里插入图片描述,第1张

                              3) RocketMQ-Console中查看消息轨迹

                              在MessageTrace大分类下有两种方式可以查看消息轨迹,一种是根据 原消息Topic + MessageKey、另一种是根据 原消息Topic + MessageID;

                              RocketMQ如何实现消息轨迹:消息何时发送的?耗时多久?谁消费的?存在哪个broker了?,在这里插入图片描述,第2张

                              所以建议如果启用了消息轨迹,在消息发送时尽量为消息指定Key属性,以便在RocketMQ-Console中对消息进行高性能的查询。

                              1> 根据Message Key查询:

                              RocketMQ如何实现消息轨迹:消息何时发送的?耗时多久?谁消费的?存在哪个broker了?,在这里插入图片描述,第3张

                              2> 根据Message ID查询:

                              RocketMQ如何实现消息轨迹:消息何时发送的?耗时多久?谁消费的?存在哪个broker了?,在这里插入图片描述,第4张

                              3、消息轨迹实现原理

                              1)消息轨迹数据结构

                              1> 消息轨迹主体内容采用TraceContext类存储;

                              public class TraceContext implements Comparable {
                                  private TraceType traceType;
                                  private long timeStamp = System.currentTimeMillis();
                                  private String regionId = "";
                                  private String regionName = "";
                                  private String groupName = "";
                                  private int costTime = 0;
                                  private boolean isSuccess = true;
                                  private String requestId = MessageClientIDSetter.createUniqID();
                                  private int contextCode = 0;
                                  private List traceBeans;
                              
                              1. traceType:跟踪类型,可选值为Pub(消息发送)、SubBefore(消息拉取到客户端,在执行业务定义的消费逻辑之前)、SubAfter(消费后)。

                              2. timeStamp:当前时间戳。

                              3. regionId:Broker所在的区域ID,取自BrokerConfig#regionId()。

                              4. groupName:组名称,traceType为Pub时表示生产者组的名称,traceType为subBefore或subAfter时表示消费组名称。

                              5. requestId:在traceType为subBefore、subAfter时使用,消

                                费端的请求ID。

                              6. contextCode:消费状态码,可选值为SUCCESS、TIME_OUT、EXCEPTION、RETURNNULL、FAILED。

                              2> 针对消息信息采用TraceBean类维护;

                              public class TraceBean {
                                  private static final String LOCAL_ADDRESS = UtilAll.ipToIPv4Str(UtilAll.getIP());
                                  private String topic = "";
                                  private String msgId = "";
                                  private String offsetMsgId = "";
                                  private String tags = "";
                                  private String keys = "";
                                  private String storeHost = LOCAL_ADDRESS;
                                  private String clientHost = LOCAL_ADDRESS;
                                  private long storeTime;
                                  private int retryTimes;
                                  private int bodyLength;
                                  private MessageType msgType;
                              }
                              
                              1. topic:消息主题
                              2. msgId:消息唯一ID
                              3. offsetMsgId:消息偏移量ID,该ID中包含了Broker的IP以及偏移量
                              4. tags:消息标志
                              5. keys:消息索引key,根据该key可快速检索消息
                              6. storeHost:跟踪类型为Pub时存储该消息的Broker服务器IP,跟踪类型为subBefore、subAfter时存储消费者IP
                              7. bodyLength:消息体的长度
                              8. msgType:消息的类型,可选值为Normal_Msg(普通消息)、Trans_Msg_Half(预提交消息)、Trans_msg_Commit(提交消息)、Delay_Msg(延迟消息)

                              2)轨迹消息存储

                              RocketMQ选择将消息轨迹数据当作一条消息,存储在Broker服务器中。

                              RocketMQ提供了两种方式来定义消息轨迹存储的topic:

                              1. 系统默认topic:如果Broker的traceTopicEnable配置项设为true,表示在该Broker上创建名为RMQ_SYS_TRACE_TOPIC的topic,队列个数为1,默认该值为false。
                              2. 自定义topic:在创建消息生产者或消息消费者时,可以通过参数自定义用于记录消息轨迹的topic名称;
                                • 注意:RokcetMQ控制台(rocketmq-console)中只支持配置一个消息轨迹topic,建议使用系统默认的topic。

                              另外:为了避免消息轨迹的数据与正常的业务数据混在一起,官方建议单独使用一个Broker用于开启消息轨迹跟踪。消息轨迹数据只会发送到这一台Broker服务器上,不影响原业务Broker的负载压力。

                              4、如何采集消息轨迹数据

                              MQ的核心操作是消息发送和消息存储,数据载体为消息。消息轨迹主要是记录消息何时发送到哪台Broker?发送耗时是多少?在什么时候被哪个消费者消费?消费耗时是多少?…

                              1)消息发送

                              在消息发送前后RocketMQ会将本次调用的信息进行采集。RocketMQ通过提供消息发送钩子函数(SendMessageHook)实现,并且为了不明显增加消息发送的时间延迟,使用异步的方式记录消息轨迹。

                              1> 实例化Producer

                              在案例中我们知道在实例化DefaultMQProducer时,需要将入参enableMsgTrace设置为true,才能开启消息轨迹。当enableMsgTrace为true时,看DefaultMQProducer的构造函数:

                              public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
                                                       boolean enableMsgTrace, final String customizedTraceTopic) {
                                  this.namespace = namespace;
                                  this.producerGroup = producerGroup;
                                  defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
                                  //if client open the message trace feature
                                  if (enableMsgTrace) {
                                      try {
                                          AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
                                          dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
                                          traceDispatcher = dispatcher;
                                          this.getDefaultMQProducerImpl().registerSendMessageHook(
                                              new SendMessageTraceHookImpl(traceDispatcher));
                                      } catch (Throwable e) {
                                          log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
                                      }
                                  }
                              }
                              

                              其中:

                              • SendMessageTraceHookImpl:消息发送的钩子函数,用于跟踪消息轨迹;
                                • 对应的消息消费钩子函数为:ConsumeMessageTraceHookImpl
                                • AsyncTraceDispatcher:消息轨迹异步转发器。用于在消息生产时,异步将消息轨迹保存到特定topic(默认为RMQ_SYS_TRACE_TOPIC)
                                  2> Producer发送消息

                                  在DefaultMqProducerImple#sendKernelImpl方法中,会首先判断是否有发送消息钩子函数(SendMessageHook);如果有:

                                  • 在发送消息之前调用钩子函数SendMessageHook#sendMessageBefore()方法,将消息轨迹数据先存储在调用上下文中。
                                  • 在发送消息之后调用钩子函数SendMessageHook#sendMessageAfter()方法,使用AsyncTraceDispatcher异步将消息轨迹数据发送到消息服务器(Broker)上。

                                    RocketMQ如何实现消息轨迹:消息何时发送的?耗时多久?谁消费的?存在哪个broker了?,请添加图片描述,第5张

                                    无论是DefaultMQProducerImpl中的executeSendMessageHookBefore()方法还是executeSendMessageHookAfter()方法,内部都是调用所有SendMessageHook的相应before()、after()方法。

                                    public void executeSendMessageHookBefore(final SendMessageContext context) {
                                        if (!this.sendMessageHookList.isEmpty()) {
                                            for (SendMessageHook hook : this.sendMessageHookList) {
                                                try {
                                                    hook.sendMessageBefore(context);
                                                } catch (Throwable e) {
                                                    log.warn("failed to executeSendMessageHookBefore", e);
                                                }
                                            }
                                        }
                                    }
                                    public void executeSendMessageHookAfter(final SendMessageContext context) {
                                        if (!this.sendMessageHookList.isEmpty()) {
                                            for (SendMessageHook hook : this.sendMessageHookList) {
                                                try {
                                                    hook.sendMessageAfter(context);
                                                } catch (Throwable e) {
                                                    log.warn("failed to executeSendMessageHookAfter", e);
                                                }
                                            }
                                        }
                                    }
                                    
                                    <1> sendMessageBefore()
                                    @Override
                                    public void sendMessageBefore(SendMessageContext context) {
                                        //if it is message trace data,then it doesn't recorded
                                        if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {
                                            return;
                                        }
                                        //build the context content of TuxeTraceContext
                                        TraceContext tuxeContext = new TraceContext();
                                        tuxeContext.setTraceBeans(new ArrayList(1));
                                        context.setMqTraceContext(tuxeContext);
                                        tuxeContext.setTraceType(TraceType.Pub);
                                        tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup()));
                                        //build the data bean object of message trace
                                        TraceBean traceBean = new TraceBean();
                                        traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic()));
                                        traceBean.setTags(context.getMessage().getTags());
                                        traceBean.setKeys(context.getMessage().getKeys());
                                        traceBean.setStoreHost(context.getBrokerAddr());
                                        traceBean.setBodyLength(context.getMessage().getBody().length);
                                        traceBean.setMsgType(context.getMsgType());
                                        traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostProducer().getmQClientFactory().getClientId());
                                        tuxeContext.getTraceBeans().add(traceBean);
                                    }
                                    

                                    方法逻辑很简单:只是在消息发送之前先收集消息的topic、tag、key、存储Broker的IP地址、消息体的长度等基础信息,并将消息轨迹数据先存储在调用上下文中。

                                    <2> sendMessageAfter()
                                    @Override
                                    public void sendMessageAfter(SendMessageContext context) {
                                        //if it is message trace data,then it doesn't recorded
                                        if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())
                                            || context.getMqTraceContext() == null) {
                                            return;
                                        }
                                        if (context.getSendResult() == null) {
                                            return;
                                        }
                                        if (context.getSendResult().getRegionId() == null
                                            || !context.getSendResult().isTraceOn()) {
                                            // if switch is false,skip it
                                            return;
                                        }
                                        TraceContext tuxeContext = (TraceContext) context.getMqTraceContext();
                                        TraceBean traceBean = tuxeContext.getTraceBeans().get(0);
                                        int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size());
                                        tuxeContext.setCostTime(costTime);
                                        if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {
                                            tuxeContext.setSuccess(true);
                                        } else {
                                            tuxeContext.setSuccess(false);
                                        }
                                        tuxeContext.setRegionId(context.getSendResult().getRegionId());
                                        traceBean.setMsgId(context.getSendResult().getMsgId());
                                        traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());
                                        traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);
                                        localDispatcher.append(tuxeContext);
                                    }
                                    

                                    消息发送到Broker返回之后会调用到sendMessageAfter()方法。

                                    1. 进行一些校验:

                                      • 如果消息发送上下为空 或 发送消息结果为空,则直接返回,不记录消息轨迹数据。
                                      • 进一步维护消息轨迹数据;

                                        • 从MqTraceContext中获取跟踪的TraceBean,虽然TraceContext中将TraceBean设计成List结构,但在消息发送时,这里的数据永远只有一条,即使是批量发送。

                                        • 设置costTime(消息发送耗时)、success(是否发送成功)、regionId(发送到Broker所在的分区)、msgId(消息ID,全局唯一)、offsetMsgId(消息物理偏移量,如果是批量消息,则是最后一条消息的物理偏移量)、storeTime。

                                        • 注意:storeTime并不是真实的消息存储时间,而是一个估算值,取自:客户端发送消息耗时的一半。

                                          traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);
                                          
                                        • 关键点:消息轨迹的异步发送;

                                          • 将消息轨迹添加到AsyncTraceDispatcher中的阻塞队列traceContextQueue中,以供后续异步发送消息轨迹使用;

                                            RocketMQ如何实现消息轨迹:消息何时发送的?耗时多久?谁消费的?存在哪个broker了?,请添加图片描述,第6张

                                          • 在实例化Producer时,如果开启了消息轨迹,会实例化AsyncTraceDispatcher;并且在启动Producer时也会启动AsyncTraceDispatcher;

                                            • 最终目的是从阻塞队列traceContextQueue中找到待异步发送的轨迹消息,然后发送到相应Broker中。

                                              RocketMQ如何实现消息轨迹:消息何时发送的?耗时多久?谁消费的?存在哪个broker了?,请添加图片描述,第7张

                                    <3> 消息轨迹异步发送

                                    进一步看AsyncTraceDispatcher#start()方法:

                                    public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
                                        if (isStarted.compareAndSet(false, true)) {
                                            traceProducer.setNamesrvAddr(nameSrvAddr);
                                            traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
                                            traceProducer.start();
                                        }
                                        this.accessChannel = accessChannel;
                                        this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
                                        this.worker.setDaemon(true);
                                        this.worker.start();
                                        this.registerShutDownHook();
                                    }
                                    

                                    其中启动一个后台线程,线程的具体逻辑体现在AsyncRunnable中。

                                    class AsyncRunnable implements Runnable {
                                            private boolean stopped;
                                            @Override
                                            public void run() {
                                                while (!stopped) {
                                                    List contexts = new ArrayList(batchSize);
                                                    for (int i = 0; i < batchSize; i++) {
                                                        TraceContext context = null;
                                                        try {
                                                            //get trace data element from blocking Queue — traceContextQueue
                                                            context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
                                                        } catch (InterruptedException e) {
                                                        }
                                                        if (context != null) {
                                                            contexts.add(context);
                                                        } else {
                                                            break;
                                                        }
                                                    }
                                                    if (contexts.size() > 0) {
                                                        AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
                                                        traceExecutor.submit(request);
                                                    } else if (AsyncTraceDispatcher.this.stopped) {
                                                        this.stopped = true;
                                                    }
                                                }
                                            }
                                        }
                                    
                                    • 从待发送队列traceContextQueue中不断获取消息轨迹的数据,并将其异步发送到消息服务器。
                                    • 为了提高消息的发送效率引入批量机制,即:一次从队列中获取一批消息
                                      • 然后封装成AsyncAppenderRequest任务并提交到线程池中异步执行;
                                      • 真正的发送消息轨迹数据的逻辑被封装在AsyncTraceDispatcher的内部类AsyncAppenderRequest#run()方法中。
                                        class AsyncAppenderRequest implements Runnable {
                                            List contextList;
                                            public AsyncAppenderRequest(final List contextList) {
                                                if (contextList != null) {
                                                    this.contextList = contextList;
                                                } else {
                                                    this.contextList = new ArrayList(1);
                                                }
                                            }
                                            @Override
                                            public void run() {
                                                sendTraceData(contextList);
                                            }
                                            public void sendTraceData(List contextList) {
                                                Map> transBeanMap = new HashMap>();
                                                for (TraceContext context : contextList) {
                                                    if (context.getTraceBeans().isEmpty()) {
                                                        continue;
                                                    }
                                                    // Topic value corresponding to original message entity content
                                                    String topic = context.getTraceBeans().get(0).getTopic();
                                                    String regionId = context.getRegionId();
                                                    // Use  original message entity's topic as key
                                                    String key = topic;
                                                    if (!StringUtils.isBlank(regionId)) {
                                                        key = key + TraceConstants.CONTENT_SPLITOR + regionId;
                                                    }
                                                    List transBeanList = transBeanMap.get(key);
                                                    if (transBeanList == null) {
                                                        transBeanList = new ArrayList();
                                                        transBeanMap.put(key, transBeanList);
                                                    }
                                                    TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context);
                                                    transBeanList.add(traceData);
                                                }
                                                for (Map.Entry> entry : transBeanMap.entrySet()) {
                                                    String[] key = entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
                                                    String dataTopic = entry.getKey();
                                                    String regionId = null;
                                                    if (key.length > 1) {
                                                        dataTopic = key[0];
                                                        regionId = key[1];
                                                    }
                                                    flushData(entry.getValue(), dataTopic, regionId);
                                                }
                                            }
                                        
                                        • 按照消息轨迹的存储协议对消息轨迹内容进行组装、编码;
                                        • 按照topic分批调用flushData()方法将消息发送到Broker中,完成消息轨迹数据的存储。

                                          2)消息消费

                                          同样,在消息消费前后RocketMQ会将本次调用的信息进行采集。RocketMQ通过提供消息消费钩子函数(ConsumeMessageHook)实现,并且为了不明显增加消息消费的时间延迟,使用异步的方式记录消息轨迹。

                                          消息消费和消息发送是一样的机制,这里就不冗余介绍了,贴几个代码截图也就一目了然了。

                                          • 实例化消费者

                                            RocketMQ如何实现消息轨迹:消息何时发送的?耗时多久?谁消费的?存在哪个broker了?,请添加图片描述,第8张

                                          • 启动消费者

                                            RocketMQ如何实现消息轨迹:消息何时发送的?耗时多久?谁消费的?存在哪个broker了?,请添加图片描述,第9张

                                          • 以并行消费消息为例:

                                            RocketMQ如何实现消息轨迹:消息何时发送的?耗时多久?谁消费的?存在哪个broker了?,请添加图片描述,第10张

                                            public void executeHookBefore(final ConsumeMessageContext context) {
                                                    if (!this.consumeMessageHookList.isEmpty()) {
                                                        for (ConsumeMessageHook hook : this.consumeMessageHookList) {
                                                            try {
                                                                hook.consumeMessageBefore(context);
                                                            } catch (Throwable e) {
                                                            }
                                                        }
                                                    }
                                                }
                                                public void executeHookAfter(final ConsumeMessageContext context) {
                                                    if (!this.consumeMessageHookList.isEmpty()) {
                                                        for (ConsumeMessageHook hook : this.consumeMessageHookList) {
                                                            try {
                                                                hook.consumeMessageAfter(context);
                                                            } catch (Throwable e) {
                                                            }
                                                        }
                                                    }
                                                }
                                            

                                            三、总结

                                            消息轨迹其实就是记录消息从发送 到 存储 再到 消费,整个消息生命周期中的一些关键信息,比如:谁发送的、发送耗时多久、消息保存在哪了、谁消费了、消费耗时多久。

                                            在RocketMQ中的实现方式也很简单,在消息发送/消费前后基于钩子函数,做before()、after()逻辑,进而记录消息轨迹信息。

                                            特别注意:storeTime并不是真实的消息存储时间,而是一个估算值,取自:客户端发送消息耗时的一半。

                                            消息轨迹功能涉及到的关键类:

                                            • AsyncTraceDispatcher:负责异步发送消息轨迹到Broker。
                                            • ConsumeMessageHook:消费消息钩子函数
                                            • SendMessageHook:生产消息钩子函数
                                            • TraceContext、TraceBean:两者一起用于表述消息轨迹