Kafka -- CommitFailedException
CommitFailedException
CommitFailedException是Consumer客户端在提交位移时出现的不可恢复的严重异常
如果异常是可恢复的瞬时错误,提交位移的API方法是支持自动错误重试的,如commitSync方法
解释
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the max.poll.interval.ms o ...
Java性能 -- 并发一致性
背景在并发编程中,Java是通过共享内存来实现共享变量操作的,所以在多线程编程中会涉及到数据一致性的问题
1234567public class Example { int x = 0; public void count() { x++; // 1 System.out.println(x) // 2 }}
有两个线程分别执行count方法,x是共享变量
可能出现3种结果:**<1,1>**,<2,1>,<1,2>
Java内存模型
Java采用共享内存模型来实现多线程之间的信息交换和数据同步
程序运行时,局部变量将会存放在虚拟机栈中,而共享变量将会被保存在堆内存中
由于局部变量随线程的创建而创建,线程的销毁而销毁,Java栈数据并非线程共享,所以不需要关心数据的一致性
共享变量存储在堆内存或方法区中,堆内存和方法区的数据是线程共享的
堆内存中的共享变量在被不同线程操作时,会被加载到线程的工作内存中,即_CPU中的高速缓存_
CP ...
Java性能 -- 命令行工具
free12345$ free -m total used free shared buffers cachedMem: 15948 15261 687 304 37 6343-/+ buffers/cache: 8880 7068Swap: 0 0 0
Mem是从操作系统的角度来看的
总共有15948M物理内存,其中15261M被使用了,还有687可用,15948 = 15261 + 687
有若干线程共享了304M物理内存,已经被弃用(值总为0)
buffer / cached :为了提高IO性能,由OS管理
A buffer is something that has yet to be “written” to disk.
A cache is something that has been “read” from the disk and stored ...
Kafka -- 提交位移
消费位移
Consumer的消费位移,记录了Consumer要消费的下一条消息的位移
假设一个分区中有10条消息,位移分别为0到9
某个Consumer消费了5条消息,实际消费了位移0到4的5条消息,此时Consumer的位移为5,指向下一条消息的位移
Consumer需要向Kafka汇报自己的位移数据,这个汇报过程就是提交位移
Consumer能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的
Consumer需要为分配给它的每个分区提交各自的位移数据
提交位移主要是为了表征Consumer的消费进度
当Consumer发生故障重启后,能够从Kafka中读取之前提交的位移值,然后从相应的位移处继续消费
位移提交的语义
如果提交了位移X,那么Kafka会认为位移值小于X的消息都已经被成功消费了
灵活
位移提交非常灵活,可以提交任何位移值,但要承担相应的后果
假设Consumer消费了位移为0~9的10条消息
如果提交的位移为20,位移位于10~19的消息可能会丢失
如果提交的位移为5,位移位于5~9的消息可能会被重复消费
位移提交的语义保障由应用程序保证,Kafka ...
Java性能 -- 协程
线程实现模型
轻量级进程和内核线程一对一相互映射实现的1:1线程模型
用户线程和内核线程实现的N:1线程模型
用户线程和轻量级进程混合实现的N:M线程模型
1:1线程模型
内核线程(Kernel-Level Thread)是由操作系统内核支持的线程,内核通过调度器对线程进行调度,负责完成线程的切换
在Linux中,往往通过fork函数创建一个子进程来代表一个内核中的线程
一个进程调用fork函数后,系统会先给新的子进程分配资源,然后复制主进程,只有少数值与主进程不一样
采用fork的方式,会产生大量的冗余数据,占用大量内存空间,也会消耗大量CPU时间来初始化内存空间和复制数据
如果是一模一样的数据,可以共享主进程的数据,于是轻量级进程(Light Weight Process,LWP)出现了
LWP使用clone系统调用创建线程
clone函数将部分父进程的资源的数据结构进行复制,复制内容可选,且没有被复制的资源可以通过指针共享给子进程
LWP运行单元更小,运行速度更快,LWP和内核线程一一映射,每个LWP都是由一个内核线程支持
N:1线程模型
1:1线程模型的缺陷
在线程创建、切换上都存在用 ...
Kafka -- 避免重平衡
概念
Rebalance是让Consumer Group下所有的Consumer实例就如何消费订阅主题的所有分区达成共识的过程
在Rebalance过程中,所有Consumer实例共同参与,在协调者组件的帮助下,完成订阅主题分区的分配
整个Rebalance过程中,所有Consumer实例都不能消费任何消息,因此对Consumer的TPS影响很大
协调者
协调者,即Coordinator,负责为Consumer Group执行Rebalance以及提供位移管理和组成员管理等
Consumer端应用程序在提交位移时,其实是向Coordinator所在的Broker提交位移
Consumer应用启动时,也是向Coordinator所在的Broker发送各种请求
然后由Coordinator负责执行消费组的注册、成员管理记录等元数据管理操作
所有Broker在启动时,都会创建和开启相应的Coordinator组件,所有Broker都有各自的Coordinator组件
内部位移主题__consumer_offsets记录了为Consumer Group服务的Coordinator在哪一台Broker上
为某 ...
Java性能 -- 线程池大小
线程池原理
在Hotspot JVM的线程模型中,Java线程被一对一映射为内核线程
Java使用线程执行程序时,需要创建一个内核线程,当该Java线程被终止时,这个内核线程也会被回收
Java线程的创建和销毁将会消耗一定的计算机资源,从而增加系统的性能开销
大量创建线程也会给系统带来性能问题,线程会抢占内存和CPU资源,可能会发生内存溢出、CPU超负载等问题
线程池:即可以提高线程复用,也可以固定最大线程数,防止无限制地创建线程
当程序提交一个任务需要一个线程时,会去线程池查找是否有空闲的线程
如果有,则直接使用线程池中的线程工作,如果没有,则判断当前已创建的线程数是否超过最大线程数
如果未超过,则创建新线程,如果已经超过,则进行排队等待或者直接抛出异常
线程池框架Executor
Java最开始提供了ThreadPool来实现线程池,为了更好地实现用户级的线程调度,Java提供了一套Executor框架
Executor框架包括了ScheduledThreadPoolExecutor和ThreadPoolExecutor两个核心线程池,核心原理一样
ScheduledThreadPoolEx ...
Java性能 -- 并发容器
并发场景下的Map容器
某电商系统需要统计销量TOP 10的商品,通常用哈希表来存储商品和销量的键值对,然后使用排序获取销量TOP 10的商品
并发场景下不能使用HashMap
JDK 1.7,在并发场景下使用HashMap会出现死循环,导致CPU使用率居高不下,而扩容是导致死循环的主要原因
JDK 1.8,虽然修复了HashMap扩容导致的死循环问题,但在高并发场景下,依然会有数据丢失和不准确的情况
为了保证Map容器的线程安全,Java实现了HashTable、ConcurrentHashMap、ConcurrentSkipListMap
HashTable、ConcurrentHashMap是基于HashMap实现的,适用于小数据量存取的场景
ConcurrentSkipListMap是基于TreeMap的设计原理实现的
ConcurrentSkipListMap是基于跳表实现的,而TreeMap是基于红黑树实现的
ConcurrentSkipListMap最大的特点是存取平均时间复杂度为O(log(n)),适用于大数据量存取的场景
HashTable / Concurren ...
Kafka -- 位移主题
ZooKeeper
老版本Consumer的位移管理依托于Apache ZooKeeper,自动或手动地将位移数据提交到ZK中保存
当Consumer重启后,能自动从ZK中读取位移数据,从而在上次消费截止的地方继续消费
这种设计使得Kafka Broker不需要保存位移数据,减少了Broker端需要持有的状态空间,有利于实现高伸缩性
但ZK并不适用于高频的写操作
位移主题
将Consumer的位移数据作为普通的Kafka消息,提交到__consumer_offsets(保存Consumer的位移信息)
提交过程需要实现高持久性,并需要支持高频的写操作
位移主题是普通的Kafka主题,同时也是一个内部主题,交由Kafka管理即可
位移主题的消息格式由Kafka定义,用户不能修改
因此不能随意向位移主题写消息,一旦写入的消息不能满足格式,那Kafka内部无法成功解析,会造成Broker崩溃
Kafka Consumer有API来提交位移(即向位移主题写消息)
消息格式
常用格式:Key-Value
Key为消息键值,Value为消息体,在Kafka中都是字节数组
Key
<Group ID, ...
Java性能 -- 线程上下文切换
线程数量
在并发程序中,并不是启动更多的线程就能让程序最大限度地并发执行
线程数量设置太小,会导致程序不能充分地利用系统资源
线程数量设置太大,可能带来资源的过度竞争,导致上下文切换,带来的额外的系统开销
上下文切换
在单处理器时期,操作系统就能处理多线程并发任务,处理器给每个线程分配CPU时间片,线程在CPU时间片内执行任务
CPU时间片是CPU分配给每个线程执行的时间段,一般为几十毫秒
时间片决定了一个线程可以连续占用处理器运行的时长
当一个线程的时间片用完,或者因自身原因被迫暂停运行,此时另一个线程会被操作系统选中来占用处理器
上下文切换(Context Switch):一个线程被暂停剥夺使用权,另一个线程被选中开始或者继续运行的过程
切出:一个线程被剥夺处理器的使用权而被暂停运行
切入:一个线程被选中占用处理器开始运行或者继续运行
切出切入的过程中,操作系统需要保存和恢复相应的进度信息,这个进度信息就是_上下文_
上下文的内容
寄存器的存储内容:CPU寄存器负责存储已经、正在和将要执行的任务
程序计数器存储的指令内容:程序计数器负责存储CPU正在执行的指令位置、即将执行的下一条指令的 ...