网站首页 > 技术教程 正文
本文从SkyWalking官方的RabbitMQ插件源码角度,剖析分布式链路跟踪原理,分析官方插件在实际应用中存在的问题,附自定义插件源码
1. RabbitMQ链路跟踪原理
SkyWalking 使用byte-buddy修改RabbitMQ生产者和消费者的字节码,做了两件事:
- 生产者端 ,在消息发送前向消息头加入TraceID,
- 消费者端,在消息接收后从消息头取出TraceID,并放入Skywalking跟踪上下文
这样就打通了生产-消费的链路。
2. 官方RabbitMQ插件的TraceID断链问题
Skywalking官方提供的RabbitMQ插件只针对RabbitMQ官方原生Client实现扩展,但我们在项目中一般不直接使用原生Client,而使用Spring RabitMQ 基于原生Client再封装的客户端。那么是如何导致TraceID断链问题的呢,下面结合源码分析:
2.1 官方插件源码的拦截点是原生Consumer的handleDelivery方法,源码如下:
2.2 而Spring RabbitMQ消费者的默认实现是BlockingQueueConsumer, handleDelivery核心逻辑是把消息放到内部的BlockingQueue队列,不做真正的消费处理,因此拦截此处无法关联到消费者逻辑,源码如下
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) {
...
try {
if (BlockingQueueConsumer.this.abortStarted > 0) {
if (!BlockingQueueConsumer.this.queue.offer(
new Delivery(consumerTag, envelope, properties, body, this.queueName),
BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {
Channel channelToClose = super.getChannel();
RabbitUtils.setPhysicalCloseRequired(channelToClose, true);
// Defensive - should never happen
BlockingQueueConsumer.this.queue.clear();
if (!this.canceled) {
RabbitUtils.cancel(channelToClose, consumerTag);
}
try {
channelToClose.close();
}
catch (@SuppressWarnings("unused") TimeoutException e) {
// no-op
}
}
}
else {
BlockingQueueConsumer.this.queue
.put(new Delivery(consumerTag, envelope, properties, body, this.queueName));
}
}
catch (@SuppressWarnings("unused") InterruptedException e) {
Thread.currentThread().interrupt();
}
catch (Exception e) {
BlockingQueueConsumer.logger.warn("Unexpected exception during delivery", e);
}
}
2.3 真正的消费处理在SimpleMessageListenerContainer,SimpleMessageListenerContainer继承Runnable接口,在其run方法中while循环调用mainLoop方法,整体调用链路为:
SimpleMessageListenerContainer.run()
-> SimpleMessageListenerContainer.mainLoop()
-> SimpleMessageListenerContainer.receiveAndExecute()
-> SimpleMessageListenerContainer.doReceiveAndExecute()
-> AbstractMessageListenerContainer.executeListener()
最终在executeListener中执行消费逻辑
protected void executeListener(Channel channel, Object data) {
...略
try {
// 执行消费逻辑
doExecuteListener(channel, data);
if (sample != null) {
this.micrometerHolder.success(sample, data instanceof Message
? ((Message) data).getMessageProperties().getConsumerQueue()
: queuesAsListString());
}
}
catch (RuntimeException ex) {
....
}
}
实现自定义插件
从上面可以分析出,AbstractMessageListenerContainer.executeListener()是最佳的拦截点
实现源码放在gitee : https://gitee.com/eureka-gitee/apm-sniffer-pro/tree/v7.0.0.0/
效果展示
SkyWalking调用链路
- 上一篇: 史上最全 Windows 安全工具锦集
- 下一篇: PC端好用软件大汇总
猜你喜欢
- 2024-11-26 清理C盘—磁盘空间可视化工具
- 2024-11-26 Adobe Project Fast Fill 生成式人工智能视频处理来了
- 2024-11-26 舍不得分享的高效工具清单
- 2024-11-26 「工具整合」PE、调试反汇编、应急、流量分析和WebShell查杀工具
- 2024-11-26 个人总结好用软件,提升效率,优雅工作
- 2024-11-26 【职场干货】十个用的顺手的办公神器,让你的工作效率增加一倍!
- 2024-11-26 值得收藏!史上最全Windows安全工具锦集
- 2024-11-26 干货|GitHUB安全搬运工 一
- 2024-11-26 总有一款软件会出乎你的意料
- 2024-11-26 推荐50个超实用的 Chrome 扩展,建议收藏
你 发表评论:
欢迎- 最近发表
-
- Win10 TH2正式版官方ESD映像转换ISO镜像方法详解
- 使用iso镜像升级到Windows 10的步骤
- macOS Ventura 13.2 (22D49) Boot ISO 原版可引导镜像
- 安利一个用ISO镜像文件制作引导U盘的的小工具RUFUS
- CentOS 7使用ISO镜像配置本地yum源
- 用于x86平台的安卓9.0 ISO镜像发布下载:通吃I/A/N、完全免费
- AlmaLinux 9.6发布:升级工具、初步支持IBM Power虚拟化技术
- Rufus写入工具简洁介绍与教程(写入模式)
- 新硬件也能安装使用了,Edge版Linux Mint 21.3镜像发布
- 开源工程师:Ubuntu应该抛弃32位ISO镜像
- 标签列表
-
- 下划线是什么 (87)
- 精美网站 (58)
- qq登录界面 (90)
- nginx 命令 (82)
- nginx .http (73)
- nginx lua (70)
- nginx 重定向 (68)
- Nginx超时 (65)
- nginx 监控 (57)
- odbc (59)
- rar密码破解工具 (62)
- annotation (71)
- 红黑树 (57)
- 智力题 (62)
- php空间申请 (61)
- 按键精灵 注册码 (69)
- 软件测试报告 (59)
- ntcreatefile (64)
- 闪动文字 (56)
- guid (66)
- abap (63)
- mpeg 2 (65)
- column (63)
- dreamweaver教程 (57)
- excel行列转换 (56)
本文暂时没有评论,来添加一个吧(●'◡'●)