编程技术分享平台

网站首页 > 技术教程 正文

分布式链路跟踪之RabbitMQ

xnh888 2024-11-26 10:00:27 技术教程 37 ℃ 0 评论

本文从SkyWalking官方的RabbitMQ插件源码角度,剖析分布式链路跟踪原理,分析官方插件在实际应用中存在的问题,附自定义插件源码

1. RabbitMQ链路跟踪原理

SkyWalking 使用byte-buddy修改RabbitMQ生产者和消费者的字节码,做了两件事:

  1. 生产者端 ,在消息发送前向消息头加入TraceID,
  2. 消费者端,在消息接收后从消息头取出TraceID,并放入Skywalking跟踪上下文

这样就打通了生产-消费的链路。

2. 官方RabbitMQ插件的TraceID断链问题

Skywalking官方提供的RabbitMQ插件只针对RabbitMQ官方原生Client实现扩展,但我们在项目中一般不直接使用原生Client,而使用Spring RabitMQ 基于原生Client再封装的客户端。那么是如何导致TraceID断链问题的呢,下面结合源码分析:

2.1 官方插件源码的拦截点是原生Consumer的handleDelivery方法,源码如下:

2.2 而Spring RabbitMQ消费者的默认实现是BlockingQueueConsumer, handleDelivery核心逻辑是把消息放到内部的BlockingQueue队列,不做真正的消费处理,因此拦截此处无法关联到消费者逻辑,源码如下

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                           byte[] body) {
  ...
  try {
    if (BlockingQueueConsumer.this.abortStarted > 0) {
      if (!BlockingQueueConsumer.this.queue.offer(
        new Delivery(consumerTag, envelope, properties, body, this.queueName),
        BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {

        Channel channelToClose = super.getChannel();
        RabbitUtils.setPhysicalCloseRequired(channelToClose, true);
        // Defensive - should never happen
        BlockingQueueConsumer.this.queue.clear();
        if (!this.canceled) {
          RabbitUtils.cancel(channelToClose, consumerTag);
        }
        try {
          channelToClose.close();
        }
        catch (@SuppressWarnings("unused") TimeoutException e) {
          // no-op
        }
      }
    }
    else {
      BlockingQueueConsumer.this.queue
        .put(new Delivery(consumerTag, envelope, properties, body, this.queueName));
    }
  }
  catch (@SuppressWarnings("unused") InterruptedException e) {
    Thread.currentThread().interrupt();
  }
  catch (Exception e) {
    BlockingQueueConsumer.logger.warn("Unexpected exception during delivery", e);
  }
}

2.3 真正的消费处理在SimpleMessageListenerContainer,SimpleMessageListenerContainer继承Runnable接口,在其run方法中while循环调用mainLoop方法,整体调用链路为:


SimpleMessageListenerContainer.run()
  -> SimpleMessageListenerContainer.mainLoop() 
  	-> SimpleMessageListenerContainer.receiveAndExecute() 
  		-> SimpleMessageListenerContainer.doReceiveAndExecute() 
  			-> AbstractMessageListenerContainer.executeListener()

最终在executeListener中执行消费逻辑

protected void executeListener(Channel channel, Object data) {
  ...略
  try {
    // 执行消费逻辑
    doExecuteListener(channel, data);
    if (sample != null) {
      this.micrometerHolder.success(sample, data instanceof Message
                                    ? ((Message) data).getMessageProperties().getConsumerQueue()
                                    : queuesAsListString());
    }
  }
  catch (RuntimeException ex) {
    ....
  }
}


实现自定义插件

从上面可以分析出,AbstractMessageListenerContainer.executeListener()是最佳的拦截点

实现源码放在gitee : https://gitee.com/eureka-gitee/apm-sniffer-pro/tree/v7.0.0.0/

效果展示

SkyWalking调用链路

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表