指令式编程

最常用的代码控制模式

1
System.out.println("Hello, World!");
  1. 通过代码发布指令,然后等待指令的执行以及指令执行带来的状态变化
  2. 并且根据目前的状态,来确定下一次要发布的指令,并且用代码下一个指令表示出来

指令式编程模型关注的重点在于控制状态

1
2
3
4
5
6
7
try {
Digest messageDigest = Digest.of("SHA-256");
byte[] digestValue =
messageDigest.digest("Hello, world!".getBytes());
} catch (NoSuchAlgorithmException ex) {
System.out.println("Unsupported algorithm: SHA-256");
}
  1. 首先调用 Digest.of 方法,得到一个 Digest 实例
  2. 然后调用该实例的方法 Digest.digest 获得一个返回值
  3. 第一个方法执行完成后,获得了第一个方法执行后的状态,第二个方法才能接着执行

这种顺序执行的模式,逻辑简单直接

  1. 常用于精确控制
  2. 该模式在通用编程语言设计和一般的应用程序开发中,占据着压倒性的优势

缺陷

  1. 该模式需要维护和同步状态
  2. 如果状态数量大,就要将大的代码块分解成小的代码块 - 代码更容易理解并且更容易维护
  3. 更大的问题来自于 - 状态同步需要的顺序执行
    • Digest.of - 效率很高,执行很快;而 Digest.digest - 效率欠佳,毫秒级甚至是秒级的
    • 在要求低延迟高并发的环境下,等待 Digest.digest 调用的返回结果,并不是一个很好的选择
    • 阻塞在方法的调用上,增加了系统的延迟,降低了系统能够支持的吞吐量 - Node.js

优化方向 - 使用非阻塞异步编程

声明式编程

  1. 非阻塞异步编程,并不是可以通过编程语言或者标准类库就能得到的
  2. 支持非阻塞的异步编程,需要大幅度地更改代码,转换代码编写的思维习惯 - 回调函数
  3. 当试图使用回调函数时,编写代码的思路模型都会产生巨大的变化
    • 指令式编程 - 控制状态 - 告诉计算机该怎么做
    • 声明式编程 - 控制目标 - 告诉计算机要做什么

如果执行成功,则执行 onSuccess 回调函数,否则,继续执行 onFailure 回调函数

1
2
3
4
5
6
7
8
public abstract sealed class Digest {

public static void of(
String algorithm, Consumer<Digest> onSuccess, Consumer<Integer> onFailure) {}

public abstract void digest(
byte[] message, Consumer<byte[]> onSuccess, Consumer<Integer> onFailure);
}
  1. 有了回调函数的设计,代码的实现方式就放开了管制
  2. 无论是回调函数的实现,还是回调函数的调用,都可以自由选择是采用异步的模式,还是同步的模式
  3. 回调函数的天生缺陷 - 即 Callback Hell - 回调堆挤
    • 通常需要布置多个小任务,才能完成一个大任务
    • 这些小任务可能是有因果关系的任务,此时需要小任务的配合,或者按顺序执行

Callback Hell - 两个回调函数的使用,就会堆积起来 - 如果回调函数的嵌套增多,可读性差,维护难度加大

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Digest.of("SHA-256",
md -> {
System.out.println("SHA-256 is not supported");
md.digest("Hello, world!".getBytes(),
values -> {
System.out.println("SHA-256 is available");
},
errorCode -> {
System.out.println("SHA-256 is not available");
});
},
errorCode -> {
System.out.println("Unsupported algorithm: SHA-256");
});
  1. 回调函数带来的形式上的堆积还可以克服,但这种形式上的堆积带来了逻辑上的堆积那几乎不可承受
  2. 逻辑上的堆积,意味着代码的深度耦合
    • 深度耦合,意味着代码维护困难
    • 深度嵌套里的一点点代码修改,都可能通过嵌套层层朝上传递,最后牵动全局
  3. 使用回调函数声明式编程模型有着严重的场景适用问题
    • 通常只使用回调函数解决性能影响最大的模块,而大部分的代码,依然使用传统的,顺序执行的指令式模型
  4. 业界试图改善回调函数使用困境,其中最为出色影响最大的是反应式编程

反应式编程

数据流 + 变化传递

  1. 反应式编程的基本逻辑,仍然是告诉计算机要做什么
  2. 但关注点转移到了数据的变化变化的传递上 - 转移到了对数据变化反应
  3. 反应式编程的核心 - 数据流变化传递
  4. 从数据的流向角度来看,数据有两种基本的形式 - 数据的输入数据的输出
    • 并衍生出三种过程 - 最初的来源数据的传递最终的结局

数据的输出

在 Java 的反应式编程模型的设计里,数据的输出使用只有一个参数Flow.Publisher 来表示

1
2
3
4
@FunctionalInterface
public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber);
}
  1. 在 Flow.Publisher 的接口设计里,泛型 T 表示的是数据类型
  2. 数据输出的对象,使用 Flow.Subscriber 来表示
  3. 数据的发布者通过授权订阅者,来实现数据从发布者到订阅者的传递
  4. 一个数据的发布者,可以有多个数据的订阅者
  5. 订阅的接口,安排在了 Flow.Publisher 接口
    • 订阅者的订阅行为,是由数据的发布者发起的,而不是订阅者发起的
  6. 数据最初的来源,就是一种形式的数据输出
    • 它只有数据输出这个传递方向,而不能接收数据的输入

数据最初来源的例子

1
SubmissionPublisher<byte[]> publisher = new SubmissionPublisher<>();

数据的输入

在 Java 的反应式编程模型的设计里,数据的输入用只有一个参数Flow.Subscriber 来表示 - 即订阅者

1
2
3
4
5
6
7
8
9
public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);

public void onNext(T item);

public void onError(Throwable throwable);

public void onComplete();
}

在 Flow.Subscriber 的接口设计里,泛型 T 表示的是数据类型
其中定义了 4 种任务,分别规定了在 4 种情形下的反应

Task Reaction
接收到订阅邀请 onSubscribe
接收到数据 onNext
遇到错误 onError
数据传输完毕 onComplete
  1. 数据最终的结局,就是一种形式的数据输入
  2. 它只有数据输入这个传递方向,而不能产生数据的输出

数据最终结果的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import java.util.concurrent.Flow;
import java.util.function.Consumer;

public class Destination<T> implements Flow.Subscriber<T> {

private Flow.Subscription subscription;
private final Consumer<T> consumer;

public Destination(Consumer<T> consumer) {
this.consumer = consumer;
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}

@Override
public void onNext(T item) {
subscription.request(1);
consumer.accept(item);
}

@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}

@Override
public void onComplete() {
System.out.println("Done");
}
}

数据的控制

  1. Flow.Subscriber 和 Flow.Publisher 没有直接联系,取而代之的是一个中间代理 Flow.Subscription
  2. Flow.Subscription 管理控制着 Flow.Publisher 和 Flow.Subscriber 之间的连接以及数据的传递

在 Java 的反应式编程模型里,数据的传递控制数据数据的变化分离了出来
这样的分离,对于降低功能之间的耦合意义重大

1
2
3
4
public static interface Subscription {
public void request(long n);
public void cancel();
}
  1. request - 表示订阅者希望接收的数据数量
  2. cancel - 表示订阅者希望取消订阅

image-20250806182432660

数据的传递

  1. 除了最初的来源最终的结局,数据表现还有一个过程,就是数据的传递
  2. 数据的传递这个过程,既包括接收输入数据,也包括发送输出数据
  3. 在数据传递这个环节,数据的内容可能会发生变化,数据的数量也可能会发生变化
    • 过滤掉一部分的数据,或者修改输入的数据,甚至替换掉输入的数据

在 Java 的反应式编程模型的设计里,该过程由 Flow.Processor 表示
Flow.Processor 是一个扩展了 Flow.PublisherFlow.Subscriber 的接口
Flow.Processor 有两个数据类型,泛型 T 表述输入数据的类型,泛型 R 表述输出数据的类型

1
2
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

使用泛型来表示输入数据输出数据的类型
然后使用一个 Function 函数,表示怎么处理接收到的数据,并且输出处理的结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Transform<T, R> extends SubmissionPublisher<R>
implements Flow.Processor<T, R> {
private Function<T, R> transform;
private Flow.Subscription subscription;

public Transform(Function<T, R> transform) {
super();
this.transform = transform;
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}

@Override
public void onNext(T item) {
submit(transform.apply(item));
subscription.request(1);
}

@Override
public void onError(Throwable throwable) {
closeExceptionally(throwable);
}

@Override
public void onComplete() {
close();
}
}

过程的串联

数据的表述方式分为输入输出两种基本的形式,还衍生出三种过程,能够很方便地串联起来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private static void transform(byte[] message,
Function<byte[], byte[]> transformFunction) {
SubmissionPublisher<byte[]> publisher =
new SubmissionPublisher<>();

// Create the transform processor
Transform<byte[], byte[]> messageDigest =
new Transform<>(transformFunction);

// Create subscriber for the processor
Destination<byte[]> subscriber = new Destination<>(
values -> System.out.println(
"Got it: " + Utilities.toHexString(values)));

// Chain processor and subscriber
publisher.subscribe(messageDigest);
messageDigest.subscribe(subscriber);
publisher.submit(message);

// Close the submission publisher.
publisher.close();
}
  1. 串联的形式,解耦了不同环节的串联,而且每个环节的代码可以换个场景复用
  2. 支持过程的串联,是反应式编程模型强大的最大动力之一
  3. 像 Scala,甚至把过程串联提升到编程语言的层面来支持
    • 极大地提高了编码效率和代码的美观程度

解决问题

  1. 解决顺序执行的模式带来的延迟效果
  2. 解决回调函数带来的堆挤问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Returned<Digest> rt = Digest.of("SHA-256");
switch (rt) {
case Returned.ReturnValue rv -> {
// Get the returned value
if (rv.returnValue() instanceof Digest d) {
// Call the transform method for the message digest.
transform("Hello, World!".getBytes(), d::digest);

// Wait for completion
Thread.sleep(20000);
} else { // unlikely
System.out.println("Implementation error: SHA-256");
}
}
case Returned.ErrorCode ec ->
System.out.println("Unsupported algorithm: SHA-256");
}
  1. 没有类似于回调函数一样的堆挤现象 - 依赖于过程串联
  2. Java 的反应式编程模型里的过程串联数据控制的设计,以及数据输入数据输出分离 - 降低代码耦合
  3. Digest.digest 方法可以直接使用,为了能够使用反应式编程模型,无需修改 Digest 代码
    • 只需把 Digest 原来的设计和实现,恰当地放到反应式编程模型里来,就能实现异步非阻塞
  4. 回调函数一样,反应式编程既能支持同步阻塞的模式,也能够支持异步非阻塞的模式
    • 接口实现是异步非阻塞模式的,那么接口调用,也是异步非阻塞的
    • 反应式编程模型的主要使用场景,主要还是异步非阻塞模式

缺陷与对策

  1. 最要命的缺陷 - 错误很难排查,这是异步编程的通病
  2. 反应式编程模型的解耦设计,加剧了错误排查的难度,会严重影响开发效率,降低代码的可维护性

协程 - Fiber

1
2
3
4
5
6
7
try {
Digest messageDigest = Digest.of("SHA-256");
byte[] digestValue =
messageDigest.digest("Hello, world!".getBytes());
} catch (NoSuchAlgorithmException ex) {
System.out.println("Unsupported algorithm: SHA-256");
}
  1. 在 Java 的指令式编程模型里,上述代码要在一个线程里执行
    • 在每个方法返回之前,线程都会处于等待状态
    • 线程的等待,是造成资源浪费的最大因素
  2. 协程的处理方式,是消除了线程的等待
    • 如果调用阻塞,就会把CPU 资源切换出去,执行其它操作
    • 这样会节省大量的计算资源,使得系统在阻塞的模式下,支持大规模的并发
  3. 如果指令式编程模型能够通过协程的方式支持大规模的并发 - 可以颠覆现有高并发架构的新技术