Java并发 -- 两阶段终止模式
两阶段终止模式第一阶段线程T1向线程T2发送终止指令,第二阶段是线程T2响应终止指令
Java线程生命周期
Java线程进入终止状态的前提是线程进入RUNNABLE状态,而实际上线程可能处于休眠状态
因为如果要终止处于休眠状态的线程,要先通过interrupt把线程的状态从休眠状态转换到RUNNABLE状态
RUNNABLE状态转换到终止状态,优雅的方式是让Java线程自己执行完run方法
设置一个标志位,然后线程会在合适的时机检查这个标志位
如果发现符合终止条件,就会自动退出run方法
第二阶段:响应终止指令
终止监控操作
监控系统需要动态采集一些数据,监控系统发送采集指令给被监控系统的的监控代理
监控代理接收到指令后,从监控目标收集数据,然后回传给监控系统
处于性能的考虑,动态采集一般都会有终止操作
12345678910111213141516171819202122232425262728293031323334353637383940public class Proxy { private boolean started = false; // 采集线程 ...
Java并发 -- Worker Thread模式
Worker Thread模式
Worker Thread模式可以类比现实世界里车间的工作模式,Worker Thread对应车间里的工人(人数确定)
用阻塞队列做任务池,然后创建固定数量的线程消费阻塞队列中的任务 – 这就是Java中的线程池方案
echo服务123456789101112131415161718192021222324252627private ExecutorService pool = Executors.newFixedThreadPool(500);public void handle() throws IOException { // 处理请求 try (ServerSocketChannel ssc = ServerSocketChannel.open().bind(new InetSocketAddress(8080))) { while (true) { // 接收请求 SocketChannel sc = ssc.accept(); // 将 ...
Java并发 -- Thread-Per-Message模式
概述Thread-Per-Message模式:为每个任务分配一个独立的线程
Thread123456789101112131415161718192021// 处理请求try (ServerSocketChannel ssc = ServerSocketChannel.open().bind(new InetSocketAddress(8080))) { while (true) { // 接收请求 SocketChannel sc = ssc.accept(); // 每个请求都创建一个线程 new Thread(() -> { try { // 读Socket ByteBuffer rb = ByteBuffer.allocateDirect(1024); sc.read(rb); TimeUnit.SECONDS.sleep(1); ...
Java并发 -- Balking模式
关于Guarded Suspension模式可以用“多线程版本的if”来理解Guarded Suspension模式,必须等到条件为真,但很多场景需要快速放弃
自动保存123456789101112131415161718192021222324252627282930public class AutoSaveEditor { // 文件是否被修改 // 非线程安全,对共享变量change的读写没有使用同步 private boolean changed = false; // 定时任务线程池 private ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); @PostConstruct public void startAutoSave() { service.scheduleWithFixedDelay(() -> autoSave(), 5, 5, TimeUnit.SECONDS); } ...
Java并发 -- Guarded Suspension模式
概述
Guarded Suspension模式是等待唤醒机制的规范实现
Guarded Suspension模式也被称为Guarded Wait 模式、Spin Lock 模式
Web版的文件浏览器
用户可以在浏览器里查看服务器上的目录和文件
该项目依赖运维部门提供的文件浏览服务,而文件浏览服务仅支持MQ接入
用户通过浏览器发送请求,会被转换成消息发送给MQ,等MQ返回结果后,再将结果返回至浏览器
1234567891011121314151617181920212223242526public class FileBrowser { // 发送消息 private void send(Message message) { } // MQ消息返回后调用该方法 public void onMessage(Message message) { } public Response handleWebReq() { Message message = new Message(1L, &quo ...
Java并发 -- ThreadLocal模式
并发问题
多个线程同时读写同一个共享变量会存在并发问题
Immutability模式和Copy-on-Write模式,突破的是写
ThreadLocal模式,突破的是共享变量
ThreadLocal的使用线程ID12345678910public class ThreadLocalId { private static final AtomicLong nextId = new AtomicLong(0); private static final ThreadLocal<Long> TL = ThreadLocal.withInitial( () -> nextId.getAndIncrement()); // 为每个线程分配一个唯一的ID private static long get() { return TL.get(); }}
SimpleDateFormat12345678public class SafeDateFormat { private st ...
Java并发 -- Copy-on-Write模式
fork
类Unix操作系统调用fork(),会创建父进程的一个完整副本,很耗时
Linux调用fork(),创建子进程时并不会复制整个进程的地址空间,而是让父子进程共享同一个地址空间
只有在父进程或者子进程需要写入时才会复制地址空间,从而使父子进程拥有各自独立的地址空间
本质上来说,父子进程的地址空间和数据都是要隔离的,使用Copy-on-Write更多体现的是一种延时策略
Copy-on-Write还支持按需复制,因此在操作系统领域能够提升性能
Java提供的Copy-on-Write容器,会复制整个容器,所以在提升读操作性能的同时,是以内存复制为代价的
CopyOnWriteArrayList / CopyOnWriteArraySet
RPC框架
服务提供方是多实例分布式部署的,服务的客户端在调用RPC时,会选定一个服务实例来调用
这个过程的本质是负载均衡,而做负载均衡的前提是客户端要有全部的路由信息
一个核心任务就是维护服务的路由关系,当服务提供方上线或者下线的时候,需要更新客户端的路由表信息
RPC调用需要通过负载均衡器来计算目标服务的IP和端口号,负载均衡器通过路由表 ...
Java并发 -- Immutability模式
Immutability模式Immutability模式:对象一旦被创建之后,状态就不再发生变化
不可变类
将一个类所有的属性都设置成final,并且只允许存在只读方法
将类也设置成final的,因为子类可以重写父类的方法,有可能改变不可变性
包装类
String、Integer、Long和Double等基础类型的包装类都具备不可变性
这些对象的线程安全都是靠不可变性来保证的
严格遵守:_类和属性都是final的,所有方法均是只读的_
如果具备不可变性的类,需要提供修改的功能,那会创建一个新的不可变对象
这会浪费内存空间,可以通过享元模式来优化
1234567891011121314151617181920212223242526272829private final char value[];public String replace(char oldChar, char newChar) { if (oldChar != newChar) { int len = value.length; int i = -1; char[] ...
Java并发 -- Fork + Join
任务视角
线程池+Future:简单并行任务
CompletableFuture:聚合任务
CompletionService:批量并行任务
Fork/Join:_分治_
分治任务模型
分治任务模型分为两个阶段:任务分解 + 结果合并
任务分解:将任务迭代地分解为子任务,直至子任务可以直接计算出结果
任务和分解后的子任务具有相似性(算法相同,只是计算的数据规模不同,往往采用递归算法)
结果合并:逐层合并子任务的执行结果,直至获得最终结果
Fork/Join概述
Fork/Join是并行计算的框架,主要用来支持分治任务模型,Fork对应任务分解,Join对应结果合并
Fork/Join框架包含两部分:分治任务ForkJoinTask + 分治任务线程池ForkJoinPool
类似于Runnable + ThreadPoolExecutor
ForkJoinTask最核心的方法是fork和join
fork:异步地执行一个子任务
join:阻塞当前线程,等待子任务的执行结果
ForkJoinTask有两个子类:RecursiveAction + R ...
Java并发 -- CompletionService
场景1234567// 从3个电商询价,保存到数据库,串行执行,性能很慢int p1 = getPriceByS1();save(p1);int p2 = getPriceByS2();save(p2);int p3 = getPriceByS3();save(p3);
ThreadPoolExecutor + Future1234567891011ExecutorService pool = Executors.newFixedThreadPool(3);Future<Integer> f1 = pool.submit(() -> getPriceByS1());Future<Integer> f2 = pool.submit(() -> getPriceByS2());Future<Integer> f3 = pool.submit(() -> getPriceByS3());int p1 = f1.get(); // 阻塞,如果f2.get()很快,但f1.get()很慢,依旧需要等待pool.execute(() -> save(p1) ...