Kafka -- 避免重平衡
概念 Rebalance是让Consumer Group下所有的Consumer实例就如何消费订阅主题的所有分区达成共识的过程 在Rebalance过程中,所有Consumer实例共同参与,在协调者组件的帮助下,完成订阅主题分区的分配 整个Rebalance过程中,所有Consumer实例都不能消费任何消息,因此对Consumer的TPS影响很大 协调者 协调者,即Coordinator,负责为Consumer Group执行Rebalance以及提供位移管理和组成员管理等 Consumer端应用程序在提交位移时,其实是向Coordinator所在的Broker提交位移 Consumer应用启动时,也是向Coordinator所在的Broker发送各种请求 然后由Coordinator负责执行消费组的注册、成员管理记录等元数据管理操作 所有Broker在启动时,都会创建和开启相应的Coordinator组件,所有Broker都有各自的Coordinator组件 内部位移主题__consumer_offsets记录了为Consumer Group服务的Coordinator在哪一台Broker上...
Java性能 -- 线程池大小
线程池原理 在Hotspot JVM的线程模型中,Java线程被一对一映射为内核线程 Java使用线程执行程序时,需要创建一个内核线程,当该Java线程被终止时,这个内核线程也会被回收 Java线程的创建和销毁将会消耗一定的计算机资源,从而增加系统的性能开销 大量创建线程也会给系统带来性能问题,线程会抢占内存和CPU资源,可能会发生内存溢出、CPU超负载等问题 线程池:即可以提高线程复用,也可以固定最大线程数,防止无限制地创建线程 当程序提交一个任务需要一个线程时,会去线程池查找是否有空闲的线程 如果有,则直接使用线程池中的线程工作,如果没有,则判断当前已创建的线程数是否超过最大线程数 如果未超过,则创建新线程,如果已经超过,则进行排队等待或者直接抛出异常 线程池框架Executor Java最开始提供了ThreadPool来实现线程池,为了更好地实现用户级的线程调度,Java提供了一套Executor框架 Executor框架包括了ScheduledThreadPoolExecutor和ThreadPoolExecutor两个核心线程池,核心原理一样 ScheduledThreadPoo...
Java性能 -- 并发容器
并发场景下的Map容器 某电商系统需要统计销量TOP 10的商品,通常用哈希表来存储商品和销量的键值对,然后使用排序获取销量TOP 10的商品 并发场景下不能使用HashMap JDK 1.7,在并发场景下使用HashMap会出现死循环,导致CPU使用率居高不下,而扩容是导致死循环的主要原因 JDK 1.8,虽然修复了HashMap扩容导致的死循环问题,但在高并发场景下,依然会有数据丢失和不准确的情况 为了保证Map容器的线程安全,Java实现了HashTable、ConcurrentHashMap、ConcurrentSkipListMap HashTable、ConcurrentHashMap是基于HashMap实现的,适用于小数据量存取的场景 ConcurrentSkipListMap是基于TreeMap的设计原理实现的 ConcurrentSkipListMap是基于跳表实现的,而TreeMap是基于红黑树实现的 ConcurrentSkipListMap最大的特点是存取平均时间复杂度为O(log(n)),适用于大数据量存取的场景 HashTable / Concur...
Kafka -- 位移主题
ZooKeeper 老版本Consumer的位移管理依托于Apache ZooKeeper,自动或手动地将位移数据提交到ZK中保存 当Consumer重启后,能自动从ZK中读取位移数据,从而在上次消费截止的地方继续消费 这种设计使得Kafka Broker不需要保存位移数据,减少了Broker端需要持有的状态空间,有利于实现高伸缩性 但ZK并不适用于高频的写操作 位移主题 将Consumer的位移数据作为普通的Kafka消息,提交到__consumer_offsets(保存Consumer的位移信息) 提交过程需要实现高持久性,并需要支持高频的写操作 位移主题是普通的Kafka主题,同时也是一个内部主题,交由Kafka管理即可 位移主题的消息格式由Kafka定义,用户不能修改 因此不能随意向位移主题写消息,一旦写入的消息不能满足格式,那Kafka内部无法成功解析,会造成Broker崩溃 Kafka Consumer有API来提交位移(即向位移主题写消息) 消息格式 常用格式:Key-Value Key为消息键值,Value为消息体,在Kafka中都是字节数组 Key <Group ...
Java性能 -- 线程上下文切换
线程数量 在并发程序中,并不是启动更多的线程就能让程序最大限度地并发执行 线程数量设置太小,会导致程序不能充分地利用系统资源 线程数量设置太大,可能带来资源的过度竞争,导致上下文切换,带来的额外的系统开销 上下文切换 在单处理器时期,操作系统就能处理多线程并发任务,处理器给每个线程分配CPU时间片,线程在CPU时间片内执行任务 CPU时间片是CPU分配给每个线程执行的时间段,一般为几十毫秒 时间片决定了一个线程可以连续占用处理器运行的时长 当一个线程的时间片用完,或者因自身原因被迫暂停运行,此时另一个线程会被操作系统选中来占用处理器 上下文切换(Context Switch):一个线程被暂停剥夺使用权,另一个线程被选中开始或者继续运行的过程 切出:一个线程被剥夺处理器的使用权而被暂停运行 切入:一个线程被选中占用处理器开始运行或者继续运行 切出切入的过程中,操作系统需要保存和恢复相应的进度信息,这个进度信息就是_上下文_ 上下文的内容 寄存器的存储内容:CPU寄存器负责存储已经、正在和将要执行的任务 程序计数器存储的指令内容:程序计数器负责存储CPU正在执行的指令位置、即将执行的下一条...
Kafka -- 消费者组
消费者组 消费者组(Consumer Group)是Kafka提供的可扩展且具有容错性的消费者机制 一个消费者组内可以有多个消费者或消费者实例(进程/线程),它们共享一个Group ID(字符串) 组内的所有消费者协调在一起来消费订阅主题的所有分区 每个分区只能由同一个消费者组内的一个Consumer实例来消费,Consumer实例对分区有所有权 消息引擎模型 两种模型:点对点模型(消息队列)、发布订阅模型 点对点模型(传统的消息队列模型) 缺陷/特性:消息一旦被消费、就会从队列中被删除,而且只能被下游的一个Consumer消费 伸缩性很差,下游的多个Consumer需要抢占共享消息队列中的消息 发布订阅模型 缺陷:伸缩性不高,每个订阅者都必须订阅主题的所有分区(全量订阅) Consumer Group 当Consumer Group订阅了多个主题之后 组内的每个Consumer实例不要求一定要订阅主题的所有分区,只会消费部分分区的消息 Consumer Group之间彼此独立,互不影响,它们能够订阅相同主题而互不干涉 Kafka使用Consumer Group...
Java性能 -- CAS乐观锁
synchronized / Lock / CAS synchronized和Lock实现的同步锁机制,都属于悲观锁,而CAS属于_乐观锁_ 悲观锁在高并发的场景下,激烈的锁竞争会造成线程阻塞,而大量阻塞线程会导致系统的上下文切换,增加系统的性能开销 乐观锁 乐观锁:在操作共享资源时,总是抱着乐观的态度进行,认为自己能够完成操作 但实际上,当多个线程同时操作一个共享资源时,只有一个线程会成功,失败的线程不会被挂起,仅仅只是返回 乐观锁相比于悲观锁来说,不会带来死锁、饥饿等活性故障问题,线程间的相互影响也远远比悲观锁要小 乐观锁没有因竞争而造成的系统上下文切换,所以在性能上更胜一筹 实现原理 CAS是实现乐观锁的核心算法,包含3个参数:V(需要更新的变量),E(预期值)、N(最新值) 只有V等于E时,V才会被设置为N 如果V不等于E了,说明其它线程已经更新了V,此时该线程不做操作,返回V的真实值 CAS实现原子操作AtomicInteger是基于CAS实现的一个线程安全的整型类,Unsafe调用CPU底层指令实现原子操作 12345678// java.util.conc...
Kafka -- 幂等性生产者 + 事务生产者
消息交付可靠性保障 消息交付可靠性保障:Kafka对Producer和Consumer要处理的消息所提供的承诺 常见的承诺 最多一次(at most once):消息可能会丢失,但绝不会被重复发送 至少一次(at least once):消息不会丢失,但有可能被重复发送 精确一次(exactly once):消息不会丢失,也不会被重复发送 Kafka默认提供的交付可靠性保障:_至少一次_ 只有Broker成功提交消息且Producer接到Broker的应答才会认为该消息成功发送 如果Broker成功提交消息,但Broker的应答没有成功送回Producer端,Producer只能选择重试 最多一次 Kafka也可以提供最多一次交付可靠性保证,只需要让Producer禁止重试即可,但大部分场景下并不希望出现消息丢失 精确一次 消息不会丢失,也不会被重复处理,即使Producer端重复发送了相同的消息,Broker端也能自动去重 两种机制:幂等性、事务 幂等性 幂等原是数学中的概念:某些操作或者函数能够被执行多次,但每次得到的结果都是不变的 幂等操作:乘1,取整函数;非幂等操作:加1 ...
Java性能 -- Lock优化
Lock / synchronizedLock锁的基本操作是通过乐观锁实现的,由于Lock锁也会在阻塞时被挂起,依然属于悲观锁 synchronized Lock 实现方式 JVM层实现 Java底层代码实现 锁的获取 JVM隐式获取 lock() / tryLock() / tryLock(timeout, unit) / lockInterruptibly() 锁的释放 JVM隐式释放 unlock() 锁的类型 非公平锁、可重入 非公平锁/公平锁、可重入 锁的状态 不可中断 可中断 锁的性能 高并发下会升级为重量级锁 更稳定 实现原理 Lock锁是基于Java实现的锁,Lock是一个接口 常见的实现类:ReentrantLock、ReentrantReadWriteLock,都是依赖AbstractQueuedSynchronizer(AQS)实现 AQS中包含了一个基于链表实现的等待队列(即CLH队列),用于存储所有阻塞的线程 AQS中有一个state变量,该变量对ReentrantLock来说表示加...
Kafka -- 生产者管理TCP连接
建立TCP连接创建KafkaProducer实例123456789101112Properties properties = new Properties();properties.put("bootstrap.servers", "localhost:9092");properties.put("key.serializer", StringSerializer.class.getName());properties.put("value.serializer", StringSerializer.class.getName());// try-with-resources// 创建KafkaProducer实例时,会在后台创建并启动Sender线程,Sender线程开始运行时首先会创建与Broker的TCP连接try (Producer<String, String> producer = new KafkaProducer<>(properties)) { ProducerRec...













