Kafka -- 监控消费进度
Consumer Lag
Consumer Lag(滞后程度):消费者当前落后于生产者的程度
Lag的单位是消息数,一般是在主题的级别上讨论Lag,但Kafka是在分区的级别上监控Lag,因此需要手动汇总
对于消费者而言,Lag是最重要的监控指标,直接反应了一个消费者的运行情况
一个正常工作的消费者,它的Lag值应该很小,甚至接近于0,滞后程度很小
如果Lag很大,表明消费者无法跟上生产者的速度,Lag会越来越大
极有可能导致消费者消费的数据已经不在操作系统的页缓存中了,这些数据会失去享有Zero Copy技术的资格
这样消费者不得不从磁盘读取这些数据,这将进一步拉大与生产者的差距
马太效应:_Lag原本就很大的消费者会越来越慢,Lag也会也来越大_
监控LagKafka自带命令
kafka-consumer-groups是Kafka提供的最直接的监控消费者消费进度的工具
也能监控独立消费者的Lag,独立消费者是没有使用消费者组机制的消费者程序,也要配置group.id
消费者组要调用KafkaConsumer.subscribe,独立消费者要调用KafkaConsumer.assign直接消费 ...
Java性能 -- GC
GC机制回收区域
JVM的内存区域中,程序计数器、虚拟机栈、本地方法栈是线程私有,随线程的创建而创建,销毁而销毁
栈中的栈帧随着方法的进入和退出进行入栈和出栈操作,每个栈帧分配多少内存基本是在类结构确定下来时就已知
因此,这三个区域的内存分配和回收都是具有确定性的
堆中的回收主要是对象回收,方法区的回收主要是废弃常量和无用类的回收
回收时机
当一个对象不再被引用,就代表该对象可以被回收
引用计数法:实现简单,判断效率高,但存在循环引用的问题
可达性分析算法:HotSpot VM
引用类型
功能特点
强引用(Strong Reference)
被强引用关联的对象,永远不会被垃圾回收器回收
软引用(Soft Reference)
被软引用关联的对象,只有当系统将要发生内存溢出时,才会去回收软引用关联的对象
弱引用(Weak Reference)
只被弱引用关联的对象,只要发生GC事件,就会被回收
虚引用(Phantom Reference)
被虚引用关联的对象,唯一作用是在这个对象被回收时收到一个系统通知
回收特性
自动性
Java提供了一个系统级的线程来跟踪每一块分配出去 ...
Kafka -- Java消费者管理TCP连接
创建TCP连接
消费者端的主要程序入口是KafkaConsumer,但构建KafkaConsumer实例不会创建任何TCP连接
构建KafkaProducer实例时,会在后台默默地启动一个Sender线程,Sender线程负责Socket连接的创建
在Java构造函数中启动线程,会造成this指针逃逸,是一个隐患
消费者的TCP连接是在调用**KafkaConsumer.poll**方法时被创建的,poll方法内部有3个时机可以创建TCP连接
发起FindCoordinator请求时
消费者组有个组件叫作协调者(Coordinator)
驻留在Broker端的内存中,负责消费者组的组成员管理和各个消费者的位移提交管理
当消费者程序首次启动调用poll方法时,需要向Kafka集群(集群中的任意Broker)发送FindCoordinator请求
社区优化:消费者程序会向集群中当前负载最小的那台Broker发送请求
单向负载评估(非最优解):消费者连接的所有Broker中,谁的待发送请求最少,谁的负载就越小
连接Coordinator时
Broker处理完FindCoordinator请求后, ...
Java性能 -- JIT
编译
前端编译:即常见的**.java文件被编译成.class文件**的过程
运行时编译:机器无法直接运行Java生成的字节码,在运行时,JIT或者解释器会将字节码转换为机器码
类文件在运行时被进一步编译,可以变成高度优化的机器代码
C/C++编译器的所有优化都是在编译期完成的,运行期的性能监控仅作为基础的优化措施是无法进行的
JIT编译器是JVM中运行时编译最重要的部分之一
编译 / 加载 / 执行
类编译
javac:将.java文件编译成.class文件
javap:反编译.class文件,重点关注常量池和方法表集合
常量池主要记录的是类文件中出现的字面量和符号引用
字面量:字符串常量、基本类型的常量
符号引用:类和接口的全限定名、类引用、方法引用、成员变量引用
方法表集合
方法的字节码、方法访问权限、方法名索引、描述符索引、JVM执行指令、属性集合等
类加载
当一个类被创建实例或者被其他对象引用时,JVM如果没有加载过该类,会通过类加载器将**.class文件加载到内存**中
不同的实现类由不同的类加载器加载
JDK中的本地方法类一般由根加载 ...
Kafka -- 多线程消费者
Kafka Java Consumer设计原理
Kafka Java Consumer从Kafka 0.10.1.0开始,KafkaConsumer变成了双线程设计,即用户主线程和心跳线程
用户主线程:启动Consumer应用程序main方法的那个线程
心跳线程:只负责定期给对应的Broker机器发送心跳请求,以标识消费者应用的存活性
引入心跳线程的另一个目的
将心跳频率和主线程调用KafkaConsumer.poll方法的频率分开,解耦真实的消息处理逻辑和消费组成员存活性管理
虽然有了心跳线程,但实际的消息获取逻辑依然是在用户主线程中完成
因此在消费消息的这个层面,依然可以安全地认为KafkaConsumer是单线程的设计
老版本Consumer是多线程的架构
每个Consumer实例在内部为所有订阅的主题分区创建对应的消息获取线程,即Fetcher线程
老版本Consumer同时也是阻塞式的,Consumer实例启动后,内部会创建很多阻塞式的消息获取迭代器
但在很多场景下,Consumer端有非阻塞需求,如在流处理应用中执行过滤、分组等操作就不能是阻塞式的
基于这个原因,社区为新版本Con ...
Java性能 -- JVM内存模型
JVM内存模型
堆
堆是JVM内存中最大的一块内存空间,被所有线程共享,几乎所有对象和数组都被分配到堆内存中
堆被划分为新生代和老年代,新生代又被划分为Eden区和Survivor区(From Survivor + To Survivor)
永久代
在Java 6中,永久代在非堆内存中
在Java 7中,永久代的静态变量和运行时常量池被合并到堆中
在Java 8中,永久代被元空间取代
程序计数器
程序计数器是一块很小的内存空间,主要用来记录各个线程执行的字节码的地址
Java是多线程语言,当执行的线程数量超过CPU数量时,线程之间会根据时间片轮询争夺CPU资源
当一个线程的时间片用完了,或者其他原因导致该线程的CPU资源被提前抢夺
那么退出的线程需要单独的程序计数器来记录下一条运行的指令
方法区
方法区 != 永久代
HotSpot VM使用了永久代来实现方法区,但在其他VM(Oracle JRockit、IBM J9)不存在永久代一说
方法区只是JVM规范的一部分,在HotSpot VM中,使用了永久代来实现JVM规范的方法区
方法区主要用来存放已被虚拟机加载的类相关信息
类 ...
Kafka -- CommitFailedException
CommitFailedException
CommitFailedException是Consumer客户端在提交位移时出现的不可恢复的严重异常
如果异常是可恢复的瞬时错误,提交位移的API方法是支持自动错误重试的,如commitSync方法
解释
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the max.poll.interval.ms o ...
Java性能 -- 并发一致性
背景在并发编程中,Java是通过共享内存来实现共享变量操作的,所以在多线程编程中会涉及到数据一致性的问题
1234567public class Example { int x = 0; public void count() { x++; // 1 System.out.println(x) // 2 }}
有两个线程分别执行count方法,x是共享变量
可能出现3种结果:**<1,1>**,<2,1>,<1,2>
Java内存模型
Java采用共享内存模型来实现多线程之间的信息交换和数据同步
程序运行时,局部变量将会存放在虚拟机栈中,而共享变量将会被保存在堆内存中
由于局部变量随线程的创建而创建,线程的销毁而销毁,Java栈数据并非线程共享,所以不需要关心数据的一致性
共享变量存储在堆内存或方法区中,堆内存和方法区的数据是线程共享的
堆内存中的共享变量在被不同线程操作时,会被加载到线程的工作内存中,即_CPU中的高速缓存_
CP ...
Java性能 -- 命令行工具
free12345$ free -m total used free shared buffers cachedMem: 15948 15261 687 304 37 6343-/+ buffers/cache: 8880 7068Swap: 0 0 0
Mem是从操作系统的角度来看的
总共有15948M物理内存,其中15261M被使用了,还有687可用,15948 = 15261 + 687
有若干线程共享了304M物理内存,已经被弃用(值总为0)
buffer / cached :为了提高IO性能,由OS管理
A buffer is something that has yet to be “written” to disk.
A cache is something that has been “read” from the disk and stored ...
Kafka -- 提交位移
消费位移
Consumer的消费位移,记录了Consumer要消费的下一条消息的位移
假设一个分区中有10条消息,位移分别为0到9
某个Consumer消费了5条消息,实际消费了位移0到4的5条消息,此时Consumer的位移为5,指向下一条消息的位移
Consumer需要向Kafka汇报自己的位移数据,这个汇报过程就是提交位移
Consumer能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的
Consumer需要为分配给它的每个分区提交各自的位移数据
提交位移主要是为了表征Consumer的消费进度
当Consumer发生故障重启后,能够从Kafka中读取之前提交的位移值,然后从相应的位移处继续消费
位移提交的语义
如果提交了位移X,那么Kafka会认为位移值小于X的消息都已经被成功消费了
灵活
位移提交非常灵活,可以提交任何位移值,但要承担相应的后果
假设Consumer消费了位移为0~9的10条消息
如果提交的位移为20,位移位于10~19的消息可能会丢失
如果提交的位移为5,位移位于5~9的消息可能会被重复消费
位移提交的语义保障由应用程序保证,Kafka ...