Kafka -- 多线程消费者
Kafka Java Consumer设计原理 Kafka Java Consumer从Kafka 0.10.1.0开始,KafkaConsumer变成了双线程设计,即用户主线程和心跳线程 用户主线程:启动Consumer应用程序main方法的那个线程 心跳线程:只负责定期给对应的Broker机器发送心跳请求,以标识消费者应用的存活性 引入心跳线程的另一个目的 将心跳频率和主线程调用KafkaConsumer.poll方法的频率分开,解耦真实的消息处理逻辑和消费组成员存活性管理 虽然有了心跳线程,但实际的消息获取逻辑依然是在用户主线程中完成 因此在消费消息的这个层面,依然可以安全地认为KafkaConsumer是单线程的设计 老版本Consumer是多线程的架构 每个Consumer实例在内部为所有订阅的主题分区创建对应的消息获取线程,即Fetcher线程 老版本Consumer同时也是阻塞式的,Consumer实例启动后,内部会创建很多阻塞式的消息获取迭代器 但在很多场景下,Consumer端有非阻塞需求,如在流处理应用中执行过滤、分组等操作就不能是阻塞式的 基于这个原因,社区为新版本...
Java性能 -- JVM内存模型
JVM内存模型 堆 堆是JVM内存中最大的一块内存空间,被所有线程共享,几乎所有对象和数组都被分配到堆内存中 堆被划分为新生代和老年代,新生代又被划分为Eden区和Survivor区(From Survivor + To Survivor) 永久代 在Java 6中,永久代在非堆内存中 在Java 7中,永久代的静态变量和运行时常量池被合并到堆中 在Java 8中,永久代被元空间取代 程序计数器 程序计数器是一块很小的内存空间,主要用来记录各个线程执行的字节码的地址 Java是多线程语言,当执行的线程数量超过CPU数量时,线程之间会根据时间片轮询争夺CPU资源 当一个线程的时间片用完了,或者其他原因导致该线程的CPU资源被提前抢夺 那么退出的线程需要单独的程序计数器来记录下一条运行的指令 方法区 方法区 != 永久代 HotSpot VM使用了永久代来实现方法区,但在其他VM(Oracle JRockit、IBM J9)不存在永久代一说 方法区只是JVM规范的一部分,在HotSpot VM中,使用了永久代来实现JVM规范的方法区 方法区主要用来存放已被虚拟机加载的类相关信...
Kafka -- CommitFailedException
CommitFailedException CommitFailedException是Consumer客户端在提交位移时出现的不可恢复的严重异常 如果异常是可恢复的瞬时错误,提交位移的API方法是支持自动错误重试的,如commitSync方法 解释 Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the max.poll.interval.m...
Java性能 -- 并发一致性
背景在并发编程中,Java是通过共享内存来实现共享变量操作的,所以在多线程编程中会涉及到数据一致性的问题 1234567public class Example { int x = 0; public void count() { x++; // 1 System.out.println(x) // 2 }} 有两个线程分别执行count方法,x是共享变量 可能出现3种结果:**<1,1>**,<2,1>,<1,2> Java内存模型 Java采用共享内存模型来实现多线程之间的信息交换和数据同步 程序运行时,局部变量将会存放在虚拟机栈中,而共享变量将会被保存在堆内存中 由于局部变量随线程的创建而创建,线程的销毁而销毁,Java栈数据并非线程共享,所以不需要关心数据的一致性 共享变量存储在堆内存或方法区中,堆内存和方法区的数据是线程共享的 堆内存中的共享变量在被不同线程操作时,会被加载到线程的工作内存中,即_CPU中的高速缓存_...
Java性能 -- 命令行工具
free12345$ free -m total used free shared buffers cachedMem: 15948 15261 687 304 37 6343-/+ buffers/cache: 8880 7068Swap: 0 0 0 Mem是从操作系统的角度来看的 总共有15948M物理内存,其中15261M被使用了,还有687可用,15948 = 15261 + 687 有若干线程共享了304M物理内存,已经被弃用(值总为0) buffer / cached :为了提高IO性能,由OS管理 A buffer is something that has yet to be “written” to disk. A cache is something that has been “read” from the disk and sto...
Kafka -- 提交位移
消费位移 Consumer的消费位移,记录了Consumer要消费的下一条消息的位移 假设一个分区中有10条消息,位移分别为0到9 某个Consumer消费了5条消息,实际消费了位移0到4的5条消息,此时Consumer的位移为5,指向下一条消息的位移 Consumer需要向Kafka汇报自己的位移数据,这个汇报过程就是提交位移 Consumer能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的 Consumer需要为分配给它的每个分区提交各自的位移数据 提交位移主要是为了表征Consumer的消费进度 当Consumer发生故障重启后,能够从Kafka中读取之前提交的位移值,然后从相应的位移处继续消费 位移提交的语义 如果提交了位移X,那么Kafka会认为位移值小于X的消息都已经被成功消费了 灵活 位移提交非常灵活,可以提交任何位移值,但要承担相应的后果 假设Consumer消费了位移为0~9的10条消息 如果提交的位移为20,位移位于10~19的消息可能会丢失 如果提交的位移为5,位移位于5~9的消息可能会被重复消费 位移提交的语义保障由应用程序保证,Ka...
Java性能 -- 协程
线程实现模型 轻量级进程和内核线程一对一相互映射实现的1:1线程模型 用户线程和内核线程实现的N:1线程模型 用户线程和轻量级进程混合实现的N:M线程模型 1:1线程模型 内核线程(Kernel-Level Thread)是由操作系统内核支持的线程,内核通过调度器对线程进行调度,负责完成线程的切换 在Linux中,往往通过fork函数创建一个子进程来代表一个内核中的线程 一个进程调用fork函数后,系统会先给新的子进程分配资源,然后复制主进程,只有少数值与主进程不一样 采用fork的方式,会产生大量的冗余数据,占用大量内存空间,也会消耗大量CPU时间来初始化内存空间和复制数据 如果是一模一样的数据,可以共享主进程的数据,于是轻量级进程(Light Weight Process,LWP)出现了 LWP使用clone系统调用创建线程 clone函数将部分父进程的资源的数据结构进行复制,复制内容可选,且没有被复制的资源可以通过指针共享给子进程 LWP运行单元更小,运行速度更快,LWP和内核线程一一映射,每个LWP都是由一个内核线程支持 N:1线程模型 1:1线程模型的缺陷 在线程创建、切换上都...
Kafka -- 避免重平衡
概念 Rebalance是让Consumer Group下所有的Consumer实例就如何消费订阅主题的所有分区达成共识的过程 在Rebalance过程中,所有Consumer实例共同参与,在协调者组件的帮助下,完成订阅主题分区的分配 整个Rebalance过程中,所有Consumer实例都不能消费任何消息,因此对Consumer的TPS影响很大 协调者 协调者,即Coordinator,负责为Consumer Group执行Rebalance以及提供位移管理和组成员管理等 Consumer端应用程序在提交位移时,其实是向Coordinator所在的Broker提交位移 Consumer应用启动时,也是向Coordinator所在的Broker发送各种请求 然后由Coordinator负责执行消费组的注册、成员管理记录等元数据管理操作 所有Broker在启动时,都会创建和开启相应的Coordinator组件,所有Broker都有各自的Coordinator组件 内部位移主题__consumer_offsets记录了为Consumer Group服务的Coordinator在哪一台Broker上...
Java性能 -- 线程池大小
线程池原理 在Hotspot JVM的线程模型中,Java线程被一对一映射为内核线程 Java使用线程执行程序时,需要创建一个内核线程,当该Java线程被终止时,这个内核线程也会被回收 Java线程的创建和销毁将会消耗一定的计算机资源,从而增加系统的性能开销 大量创建线程也会给系统带来性能问题,线程会抢占内存和CPU资源,可能会发生内存溢出、CPU超负载等问题 线程池:即可以提高线程复用,也可以固定最大线程数,防止无限制地创建线程 当程序提交一个任务需要一个线程时,会去线程池查找是否有空闲的线程 如果有,则直接使用线程池中的线程工作,如果没有,则判断当前已创建的线程数是否超过最大线程数 如果未超过,则创建新线程,如果已经超过,则进行排队等待或者直接抛出异常 线程池框架Executor Java最开始提供了ThreadPool来实现线程池,为了更好地实现用户级的线程调度,Java提供了一套Executor框架 Executor框架包括了ScheduledThreadPoolExecutor和ThreadPoolExecutor两个核心线程池,核心原理一样 ScheduledThreadPoo...
Java性能 -- 并发容器
并发场景下的Map容器 某电商系统需要统计销量TOP 10的商品,通常用哈希表来存储商品和销量的键值对,然后使用排序获取销量TOP 10的商品 并发场景下不能使用HashMap JDK 1.7,在并发场景下使用HashMap会出现死循环,导致CPU使用率居高不下,而扩容是导致死循环的主要原因 JDK 1.8,虽然修复了HashMap扩容导致的死循环问题,但在高并发场景下,依然会有数据丢失和不准确的情况 为了保证Map容器的线程安全,Java实现了HashTable、ConcurrentHashMap、ConcurrentSkipListMap HashTable、ConcurrentHashMap是基于HashMap实现的,适用于小数据量存取的场景 ConcurrentSkipListMap是基于TreeMap的设计原理实现的 ConcurrentSkipListMap是基于跳表实现的,而TreeMap是基于红黑树实现的 ConcurrentSkipListMap最大的特点是存取平均时间复杂度为O(log(n)),适用于大数据量存取的场景 HashTable / Concur...














