网站首页 > 技术教程 正文
Kafka-client 版本 2.2.2
这里用一个demo来解释这个问题的原因和排查思路
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class MessageQueueProducer {
KafkaProducer kafkaProducer;
public MessageQueueProducer(KafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}
public synchronized void send(final String topic, final String message) {
ProducerRecord<String, String> kafkaMessage = new ProducerRecord<String, String>(topic, message);
kafkaProducer.send(kafkaMessage, (recordMetadata, e) -> {
if (e == null) {
//success
} else {
//send "retry-topic" to retry
send("retry-topic", message);
}
}
);
}
public synchronized void close() {
kafkaProducer.flush();
kafkaProducer.close();
}
}
原因分析
- 假设我们对KafkaProducer进行了一个简单的封装(如上)
- 假设有两个线程 threadA 和 threadB
- threadA 调用 MessageQueueProducer.close方法,close方法中的 flush本意是想,在Producer被close之前把buffer中数据一次性发送到Broker来保障数据的完整。
- 所有方法都有synchronized修饰,所以threadA拿到了MessageQueueProducer对象锁
- flush方法要等待所有的数据发送完成并收到Broker的确认
就在此刻,问题来了
假设threadB 调用了 MessageQueueProducer.send时遇到了异常,需要在Kafka的回调函数中向重试队列发送消息,进行异步重试
参看Kafka的文档,回掉函数是由Producer的ioThread进行调用的,所以此刻,ioThread开始请求 MessageQueueProducer 这个对象的对象锁,但是这个对象的锁已经被threadA占有,且threadA在等待ioThread执行所有回调,来保证消息发送完成。
所以MessageQueueProducer对象就一直被锁住了,send也send不了,close也close不了
flush方法的源码解析
RecordAccumulator.java :690
/**
* Mark all partitions as ready to send and block until the send is complete
*/
public void awaitFlushCompletion() throws InterruptedException {
try {
for (ProducerBatch batch : this.incomplete.copyAll())
//所有未完成的消息,挨个等待完成确认
batch.produceFuture.await();
} finally {
this.flushesInProgress.decrementAndGet();
}
}
produceFuture :ProduceRequestResult
/**
* Mark this request as complete and unblock any threads waiting on its completion.
*/
public void done() {
if (baseOffset == null)
throw new IllegalStateException("The method `set` must be invoked before this method.");
this.latch.countDown();
}
/**
* Await the completion of this request
*/
public void await() throws InterruptedException {
latch.await();
}
可以发现CountDownLatch 只有在done方法中会执行countDown,看一下done方法是不是被ioThread调用的
Sender.completeBatch --> ProducerBatch.done --> ProducerBatch.completeFutureAndFireCallbacks -->
produceFuture.done
复制代码
问题排查的思路
现象
很明显,就发现调用MessageQueueProducer.send方法的线程都HANG住了,没法继续执行。
上面例子中就是threadA 和 threadB都处于 BLOCKED状态。
告警
告警对于程序的健壮太重要了,上面的情况如果没有告警,可能难于发现,告警指标可以根据业务需要来配置,比如定时produce消息的线程十分钟还不发一条消息就告警。
排查
1.到机器上启动Arthas attch当前应用
2.执行thread -all,查看所有线程的状态
3.可以看到threadA 和 threadB都是阻塞状态
4.执行thread -b
5.Arthas找到 阻塞其他线程的罪魁祸首,也就是threadA。打印threadA的堆栈
6.jstack -l pid 拿到所有线程的堆栈
7.对比被threadA 阻塞的线程数量和 Arthas输出的信息是否一致。
原文链接:https://juejin.cn/post/6994349871960424462
猜你喜欢
- 2024-11-05 恐高症患者 大众CC改装HellaFlush风格
- 2024-11-05 Redis缓存:redis 数据库管理(redis数据库缓存机制)
- 2024-11-05 空气悬架上身 奥迪S7改HellaFlush风格
- 2024-11-05 HellaFlush风格 大众高尔夫改装方案
- 2024-11-05 年轻的老虫子 老款大众甲壳虫hellaflush潮改装
- 2024-11-05 据说这种“外八”改装风格 许多车主都喜欢
- 2024-11-05 李洋分享奥迪Q5改装AIRBFT气动避震品牌优势
- 2024-11-05 ctx.writeAndFlush(protocol).sync()是什么功能
- 2024-11-05 AIRBFT气动避震工厂李洋讲述大众迈腾旅行版为什么都改装低趴
- 2024-11-05 图解MySQL(5)-Buffer Pool的flush链表
你 发表评论:
欢迎- 最近发表
-
- Win11学院:如何在Windows 11上使用WSL安装Ubuntu
- linux移植(Linux移植freemodbus)
- 独家解读:Win10预览版9879为何无法识别硬盘
- 基于Linux系统的本地Yum源搭建与配置(ISO方式、RPM方式)
- Docker镜像瘦身(docker 减小镜像大小)
- 在linux上安装ollama(linux安装locale)
- 渗透测试系统Kali推出Docker镜像(kali linux渗透测试技术详解pdf)
- Linux环境中部署Harbor私有镜像仓库
- linux之间传文件命令之Rsync傻瓜式教程
- 解决ollama在linux中安装或升级时,通过国内镜像缩短安装时长
- 标签列表
-
- 下划线是什么 (87)
- 精美网站 (58)
- qq登录界面 (90)
- nginx 命令 (82)
- nginx .http (73)
- nginx lua (70)
- nginx 重定向 (68)
- Nginx超时 (65)
- nginx 监控 (57)
- odbc (59)
- rar密码破解工具 (62)
- annotation (71)
- 红黑树 (57)
- 智力题 (62)
- php空间申请 (61)
- 按键精灵 注册码 (69)
- 软件测试报告 (59)
- ntcreatefile (64)
- 闪动文字 (56)
- guid (66)
- abap (63)
- mpeg 2 (65)
- column (63)
- dreamweaver教程 (57)
- excel行列转换 (56)
本文暂时没有评论,来添加一个吧(●'◡'●)