Java并发 -- Disruptor
有界队列
JUC中的有界队列ArrayBlockingQueue和LinkedBlockingQueue,都是基于ReentrantLock
在高并发场景下,锁的效率并不高,Disruptor是一款性能更高的有界内存队列
Disruptor高性能的原因
内存分配更合理,使用RingBuffer,数组元素在初始化时一次性全部创建
提升缓存命中率,对象循环利用,避免频繁GC
能够避免伪共享,提升缓存利用率
采用无锁算法,避免频繁加锁、解锁的性能消耗
支持批量消费,消费者可以以无锁的方式消费多个消息
简单使用123456789101112131415161718192021222324252627public class DisruptorExample { public static void main(String[] args) throws InterruptedException { // RingBuffer大小,必须是2的N次方 int bufferSize = 1024; // 构建Disruptor D ...
Java并发 -- Netty线程模型
BIO
BIO即阻塞式IO,使用BIO模型,一般会为每个Socket分配一个独立的线程
为了避免频繁创建和销毁线程,可以采用线程池,但Socket和线程之间的对应关系不会发生变化
BIO适用于Socket连接不是很多的场景,但现在上百万的连接是很常见的,而创建上百万个线程是不现实的
因此BIO线程模型无法解决百万连接的问题
在互联网场景中,连接虽然很多,但每个连接上的请求并不频繁,因此线程大部分时间都在等待IO就绪
理想的线程模型
用一个线程来处理多个连接,可以提高线程的利用率,降低所需要的线程
使用BIO相关的API是无法实现的,BIO相关的Socket读写操作都是阻塞式的
一旦调用了阻塞式的API,在IO就绪前,调用线程会一直阻塞,也就无法处理其他的Socket连接
利用NIO相关的API能够实现一个线程处理多个连接,通过Reactor模式实现
Reactor模式
Handle指的是IO句柄,在Java网络编程里,本质上是一个网络连接
Event Handler是事件处理器,handle_event()处理IO事件,每个Event Handler处理一个IO Handle ...
Java并发 -- Guava RateLimiter
RateLimiter12345678910111213141516171819// 限流器流速:2请求/秒RateLimiter limiter = RateLimiter.create(2.0);ExecutorService pool = Executors.newFixedThreadPool(1);final long[] prev = {System.nanoTime()};for (int i = 0; i < 20; i++) { // 限流器限流 limiter.acquire(); pool.execute(() -> { long cur = System.nanoTime(); System.out.println((cur - prev[0]) / 1000_000); prev[0] = cur; });}// 输出// 499// 499// 497// 502// 496
令牌桶算法
Guava RateLimiter采用的是 ...
Java并发 -- 生产者-消费者模式
生产者-消费者模式
生产者-消费者模式的核心是一个_任务队列_
生产者线程生产任务,并将任务添加到任务队列中,消费者线程从任务队列中获取任务并执行
从架构设计的角度来看,生产者-消费者模式有一个很重要的优点:_解耦_
生产者-消费者模式另一个重要的优点是支持异步,并且能够平衡生产者和消费者的速度差异(任务队列)
支持批量执行
往数据库INSERT 1000条数据,有两种方案
第一种方案:用1000个线程并发执行,每个线程INSERT一条数据
第二种方案(更优):用1个线程,执行一个批量的SQL,一次性把1000条数据INSERT进去
将原来直接INSERT数据到数据库的线程作为生产者线程,而生产者线程只需将数据添加到任务队列
然后消费者线程负责将任务从任务队列中批量取出并批量执行
1234567891011121314151617181920212223242526272829303132333435363738// 任务队列private BlockingQueue<Task> queue = new LinkedBlockingQueue<>(2000 ...
Java并发 -- 两阶段终止模式
两阶段终止模式第一阶段线程T1向线程T2发送终止指令,第二阶段是线程T2响应终止指令
Java线程生命周期
Java线程进入终止状态的前提是线程进入RUNNABLE状态,而实际上线程可能处于休眠状态
因为如果要终止处于休眠状态的线程,要先通过interrupt把线程的状态从休眠状态转换到RUNNABLE状态
RUNNABLE状态转换到终止状态,优雅的方式是让Java线程自己执行完run方法
设置一个标志位,然后线程会在合适的时机检查这个标志位
如果发现符合终止条件,就会自动退出run方法
第二阶段:响应终止指令
终止监控操作
监控系统需要动态采集一些数据,监控系统发送采集指令给被监控系统的的监控代理
监控代理接收到指令后,从监控目标收集数据,然后回传给监控系统
处于性能的考虑,动态采集一般都会有终止操作
12345678910111213141516171819202122232425262728293031323334353637383940public class Proxy { private boolean started = false; // 采集线程 ...
Java并发 -- Worker Thread模式
Worker Thread模式
Worker Thread模式可以类比现实世界里车间的工作模式,Worker Thread对应车间里的工人(人数确定)
用阻塞队列做任务池,然后创建固定数量的线程消费阻塞队列中的任务 – 这就是Java中的线程池方案
echo服务123456789101112131415161718192021222324252627private ExecutorService pool = Executors.newFixedThreadPool(500);public void handle() throws IOException { // 处理请求 try (ServerSocketChannel ssc = ServerSocketChannel.open().bind(new InetSocketAddress(8080))) { while (true) { // 接收请求 SocketChannel sc = ssc.accept(); // 将 ...
Java并发 -- Thread-Per-Message模式
概述Thread-Per-Message模式:为每个任务分配一个独立的线程
Thread123456789101112131415161718192021// 处理请求try (ServerSocketChannel ssc = ServerSocketChannel.open().bind(new InetSocketAddress(8080))) { while (true) { // 接收请求 SocketChannel sc = ssc.accept(); // 每个请求都创建一个线程 new Thread(() -> { try { // 读Socket ByteBuffer rb = ByteBuffer.allocateDirect(1024); sc.read(rb); TimeUnit.SECONDS.sleep(1); ...
Java并发 -- Balking模式
关于Guarded Suspension模式可以用“多线程版本的if”来理解Guarded Suspension模式,必须等到条件为真,但很多场景需要快速放弃
自动保存123456789101112131415161718192021222324252627282930public class AutoSaveEditor { // 文件是否被修改 // 非线程安全,对共享变量change的读写没有使用同步 private boolean changed = false; // 定时任务线程池 private ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); @PostConstruct public void startAutoSave() { service.scheduleWithFixedDelay(() -> autoSave(), 5, 5, TimeUnit.SECONDS); } ...
Java并发 -- Guarded Suspension模式
概述
Guarded Suspension模式是等待唤醒机制的规范实现
Guarded Suspension模式也被称为Guarded Wait 模式、Spin Lock 模式
Web版的文件浏览器
用户可以在浏览器里查看服务器上的目录和文件
该项目依赖运维部门提供的文件浏览服务,而文件浏览服务仅支持MQ接入
用户通过浏览器发送请求,会被转换成消息发送给MQ,等MQ返回结果后,再将结果返回至浏览器
1234567891011121314151617181920212223242526public class FileBrowser { // 发送消息 private void send(Message message) { } // MQ消息返回后调用该方法 public void onMessage(Message message) { } public Response handleWebReq() { Message message = new Message(1L, &quo ...
Java并发 -- ThreadLocal模式
并发问题
多个线程同时读写同一个共享变量会存在并发问题
Immutability模式和Copy-on-Write模式,突破的是写
ThreadLocal模式,突破的是共享变量
ThreadLocal的使用线程ID12345678910public class ThreadLocalId { private static final AtomicLong nextId = new AtomicLong(0); private static final ThreadLocal<Long> TL = ThreadLocal.withInitial( () -> nextId.getAndIncrement()); // 为每个线程分配一个唯一的ID private static long get() { return TL.get(); }}
SimpleDateFormat12345678public class SafeDateFormat { private st ...