概述

  1. Guarded Suspension模式是等待唤醒机制的规范实现
  2. Guarded Suspension模式也被称为Guarded Wait 模式、Spin Lock 模式

Web版的文件浏览器

  1. 用户可以在浏览器里查看服务器上的目录和文件
  2. 该项目依赖运维部门提供的文件浏览服务,而文件浏览服务仅支持MQ接入
  3. 用户通过浏览器发送请求,会被转换成消息发送给MQ,等MQ返回结果后,再将结果返回至浏览器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class FileBrowser {
// 发送消息
private void send(Message message) {
}

// MQ消息返回后调用该方法
public void onMessage(Message message) {
}

public Response handleWebReq() {
Message message = new Message(1L, "123");
// 发送消息
send(message);
// 如何等待MQ返回消息?
return new Response();
}
}

@AllArgsConstructor
class Message {
private Long id;
private String content;
}

class Response {
}

Guarded Suspension模式

  1. Guarded Suspension直译为_保护性暂停_
  2. 通过onChange方法可以产生一个事件,而这个事件往往能改变前提条件p的计算结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class GuardedObject<T> {
private static final int TIMEOUT = 1;

// 受保护对象
private T obj;
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();

// 获取受保护对象
public T get(Predicate<T> p) {
lock.lock();
try {
// MESA管程推荐写法
while (!p.test(obj)) {
done.await(TIMEOUT, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
return obj;
}

// 事件通知方法
public void onChange(T obj) {
lock.lock();
try {
this.obj = obj;
done.signalAll();
} finally {
lock.unlock();
}
}
}

扩展Guarded Suspension模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class GuardedObject<T> {
private static final int TIMEOUT = 1;
// 保存所有的GuardedObject
private static final Map<Object, GuardedObject> goMap = new ConcurrentHashMap<>();

// 受保护对象
private T obj;
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();

public static <K> GuardedObject create(K key) {
GuardedObject go = new GuardedObject();
goMap.put(key, go);
return go;
}

public static <K, T> void fireEvent(K key, T obj) {
GuardedObject go = goMap.remove(key);
if (go != null) {
go.onChange(obj);
}
}

// 获取受保护对象
public T get(Predicate<T> p) {
lock.lock();
try {
// MESA管程推荐写法
while (!p.test(obj)) {
done.await(TIMEOUT, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
return obj;
}

// 事件通知方法
public void onChange(T obj) {
lock.lock();
try {
this.obj = obj;
done.signalAll();
} finally {
lock.unlock();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class FileBrowser {
// 发送消息
private void send(Message message) {
}

// MQ消息返回后调用该方法
public void onMessage(Message message) {
// 唤醒等待的线程
GuardedObject.fireEvent(message.getId(), message);
}

public Response handleWebReq() {
Long id = 1L;
Message message = new Message(id, "123");
GuardedObject go = GuardedObject.create(id);
// 发送消息
send(message);
// 等待MQ消息
go.get(Objects::nonNull);
return new Response();
}
}

@Data
@AllArgsConstructor
class Message {
private Long id;
private String content;
}

class Response {
}

参考资料

Java并发编程实战