01 March 2023

汇总

简单的多线程编程方式,添加一个 结构化并发的api。 结构化并发将运行在多个线程的多个任务视为一个工作单元,可以统一进行错误处理,任务取消,提升可靠性和加强可观测性性。 这是一个孵化中的api。

目标

提升多线程编程的可靠性、可观测性和可维护性 提供一种并发编程模式,可以消除由取消(cancellation)和停止(shutdown)带来的大多数风险,例如:线程泄漏和延迟取消

非目标

取代现有java并发体系,java.util.concurrent 包,比如 Future 以及 ExecutorService 定义最终的结构化并发体系;第三方包一样可以提供结构化并发包 定义一个新的通道来在线程之前共享数据 使用新的线程取消指令来取代现有的thread取消指令.将来可能会这么做

动机

使用 ExecutorService 的非结构化并发

一个例子,可能会导致线程泄漏,以及延迟取消。都是浪费资源的行为 在jdk5中推出的 java.util.concurrent.ExecutorService api,可以帮助开发者并发执行子任务。 这里有一个例子,handle() 方法,它体现了一个在服务端运行的任务。 接收一个请求,并提交两个子任务到 ExecutorService,一个子任务执行 findUser() 方法,另外一个子任务执行 fetchOrder() 方法 ExecutorService 为每个子任务返回一个 Future 对象,并且每个子任务运行在自己的子线程中 handle() 方法会阻塞等待两个子任务的结果,因为调用了 Future.get() 方法,而这就是一个阻塞获取结果的方法

Response handle() throws ExecutionException, InterruptedException {
Future<String>  user  = esvc.submit(() -> findUser());
Future<Integer> order = esvc.submit(() -> fetchOrder());
String theUser  = user.get();   // Join findUser
int    theOrder = order.get();  // Join fetchOrder
return new Response(theUser, theOrder);
}

因为子任务是并发运行的,每一个子任务都可能单独成功或者失败(这里的失败指抛出异常)。 通常,任意一个子任务的失败,会导致整败,这就导致了如果发生了故障,这里的生命周期会十分难理解: 如果 findUser() 方法抛出异常,那么handle()方法调用相应Future的get()方法时也会抛出异常。但是此时fetchOrder()方法依然在自己的线程中执行。这种情况被称为 线程泄漏.最坏的情况下,fetchOrder() 甚至可能会影响其他线程. 如果执行handle() 方法的线程 interrupted,相应的中断不会反应到对应的字线程中,findUser()和fetchOrder()的线程都会泄漏.,它们会继续运行,不理会handle()方法甚至已经失败了 如果 findUser()耗时较长,而 fetchOrder() 已经失败了,handle() 方法将一直等待 findUser() 返回结果,因为先调用的findUser(的 get方法阻塞等待。只有 findUser() 完成以后,才会调用到 fetchOrder().get9) 并发现异常。这将造成不必要的资源浪费和延迟上升。

在上述每一个case中,问题的根源都是,任务之间的关系,只存在于开发者的脑海中,没有使用代码逻辑来体现出来。 这不仅为错误创造了更多的空间,还使诊断和排除此类错误更加困难。 可观测性工具,比如 thread dumps,只会展示 handle()、findUser()和fetchOrder() 在不同的线程栈上,不会显示他们的关系 任务结构应该能从代码结构上反应(Task structure should reflect code structure) 比如,在单线程版本的 handle() 中,子任务的关系是显而易见的

Response handle() throws IOException {
String theUser  = findUser();
int    theOrder = fetchOrder();
return new Response(theUser, theOrder);
}

只有findUser()完成(不管是成功还是失败)以后,fetchOrder()才会开始.如果findUser()失败,我们不用开始fetchOrder().handle()会失败 子任务只能返回结果到父节点这一事实很重要:父任务可以把子任务的失败当成取消其他剩余子任务的触发器,然后让自身也失败。

在单线程代码中,任务-子任务的层次关系在运行时在调用栈中是明确的。我们因此可以获取到 父-子任务的关系以及错误信息; 当我们观测单线程时,层次关系是这样的:findUser(),然后是fetchOrder(),都归属于 handle();

多线程并发如果能够就像单线程一样,在运行时完整得体现父-子任务关系,那么它是可以更简单,更可靠,更容易观测的。

(java 已经有一个用来改善并发任务结构性的api,java.util.concurrent.ForkJoinPool , 基于并发流的执行引擎。这个api被设计用于计算密集的场景,而非io密集场景)

结构化并发

结构化并发是一种并发编程方法,可以在多线程编程中提供类似单线程编程的可读性、可维护性和可观测性。

如果一个任务分割成并发的子任务,然后它们全部在一个地方返回,即这个任务的代码块

“结构化并发”一词由 Martin Sústrik 提出,由Nathaniel J. Smith推广。从其他语言来的想法,比如Erlang 的hierarchical supervisors 结构化并发的错误处理。

在结构化并发中,子任务代表任务运行,任务等待子任务的结果和监控它们的错误。使用结构化并发的代码在一个线程中。 多线程的结构化并发的力量来自于两个想法:

  1. 一个具有良好定义进入和退出的执行流代码块
  2. 严格的生命周期嵌套,以反映代码中的语法嵌套

因为代码块的进入和退出点被良好地定义了,并发子任务的生命周期将限于其父任务的代码块。 Because the entry and exit points of a block of code are well defined, the lifetime of a concurrent subtask is confined to the syntactic block of its parent task. Because the lifetimes of sibling subtasks are nested within that of their parent task, they can be reasoned about and managed as a unit. Because the lifetime of the parent task is, in turn, nested within that of its parent, the runtime can reify the hierarchy of tasks into a tree. That tree is the concurrent counterpart of the call stack of a single thread, and observability tools can use it to present subtasks as subordinate to their parent tasks.

结构化并发和虚拟线程非常搭配。大量的虚拟线程共享一些操作系统线程,允许使用非常大量级的虚拟线程。 除了数量充足以外,虚拟线程的成本足够低,能够表达任何并发行为,甚至包括io行为。 这意味着一个服务端程序可以用结构化并发来同时处理数百万的传入请求,当一个请求会因为子任务产出的扇出,可以创建新的虚拟线程。Behind the scenes, the task-subtask relationship is reified into a tree by arranging for each virtual thread to carry a reference to its unique parent, similar to how a frame in the call stack refers to its unique caller.

最后总结,虚拟线程提供了大量的线程,结构化并发保证了在如此大量级的线程下,并发是正确的、有力协调的,能够让可观测性工具将线程以开发者理解的方式展示给他们。JDK中的这个结构化并发 API 将提升服务端应用的可维护性、可靠性和可观测性。

描述

结构化并发api的主要类是 StructuredTaskScope. 该类允许开发者将一个并发任务结构化为一个并发子线程组,并将他们作为一个单位来管理 子任务们运行在各自的线程上,使用 forking 和 joining 来形成一个单元,取消也作为一个单元。 子任务的成功结果或者异常都由父线程聚合和处个 handle 方法失理 结构化并发限制了子任务的生命周期, forking , joining ,取消,错误处理和合并结果。

这里是前面的handle()方法,但是使用结构化并发api编写(ShutdownOnFailure api的解释在下面)。

Response handle() throws ExecutionException, InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<String>  user  = scope.fork(() -> findUser());
Future<Integer> order = scope.fork(() -> fetchOrder());

        scope.join();           // Join both forks
        scope.throwIfFailed();  // ... and propagate errors

        // Here, both forks have succeeded, so compose their results
        return new Response(user.resultNow(), order.resultNow());
    }
}

使用结构化并发,能带来以下这些有价值的点:

  • 能够实现短路的错误处理 - 如果 findUser() 或者 fecthOrder() 任何一个失败,另外一个任务如果还没有完成的话将会被取消(取消策略由 ShutdownOnFailure 来实现,其他策略也可以自定义实现)
  • 取消传播 - 如果运行 handle() 方法的线程被取消,那么在调用join之前或者正在调用,两个frok的任务都将自动取消
  • 清晰 - 上述代码具有清晰的结构:设置两个子任务,等待它们计算完成或者取消,根据子任务的结果来决定是成功(两个子任务都已经完成)还是失败(不需要额外的等待,没有更多的东西需要清理) 。
  • 可观测性 - thread dump , 如前文描述的,清晰得展示 任务依赖,能将 findUser() 、 fetchOrder() 等展示在一个scope里面。

类似 ExecuorService.submit(…), StructuredTaskScope.fork(…)接收一个Callable参数,返回一个Future参数.不同于ExecutorService的是, StructuredTaskScope.fork()返回的Future没有必要调用get()获取结果或者调用cancel()来取消任务。 两个新的Future 方法:resultNow() 和 exceptionNow() 被设计来在子任务完成直接获取结果,比如在调用 scope.join() 之后;

使用结构化并发

在代码中使用结构化并发的通常流程如下:

  • 创建一个 scope。创建scope的线程是scope的所有者(owner).
  • 在 scop 中 fork 子任务
  • scope 中的任意一个fork或者scope的所有者线程都困难调用 shuwdown() 方法来请求取消现有的子任务
  • scope 的所有者会join scope
  • joining完成后,处理forks的任意错误以及处理它们的结果
  • 关闭scope,通常使用 try-with-resources .它将会自动关闭scope和等待所有forks完成。

如果scope的所有者本身就处于另一个scope中,那么它将成为新scope的父scope。形成一个嵌套结构,对于任务取消等都支持传播。 每一个fork都运行在它自己的新创建的线程中,默认使用虚拟线程。

当 join() 返回结果时,所有forks都完成(成功或者失败)或者被取消。它们的结果可以通过 resultNow() 或者 exceptionNow() 被立即获取,没有任何额外的阻塞(这两个方法会抛出 IllegalStateException , 如果future还没有完成的话)。

在scope中,调用 join() 或者 joinUntil() 是强制的。如果在fork的子任务完成之前,还没有触发对 join() 的调用,那么scope将会等待所有任务终结,并抛出异常。

结构化并发强制保证并发操作的结构和顺序.所以并没有修改原有的ExecutorService 或者 Executor 接口,因为它们大多数用于非结构化并发场景。但是将使用ExecutorService的代码迁移到结构化并发是简单的。 结构化并发是一个孵化中的api,默认情况下未被启用 上面的示例使用了结构化并发,为了运行它们,你必须添加 jdk.incubator.concurrent 模块以及启用preview 特性来启用虚拟线程 使用javac 编译程序时:javac –release XX –enable-preview –add-modules jdk.incubator.concurrent Main.java .使用java运行程序时:java –enable-preview –add-modules jdk.incubator.concurrent Main 使用 source code Launcher 时:java –source XX –enable-preview –add-modules jdk.incubator.concurrent Main.java 使用 JShell 时:jshell –enable-preview –add-modules jdk.incubator.concurrent

shutdown 策略

处理并发子任务时,通常使用快速短路模式来避免不必要的工作。 有时候,它有效,例如:当任意一个子任务失败时,取消所有任务(invokeAll就是这样),或者其中任意一个成功(invokeAny) StructredTaskScope 的两个子类:ShutdownOnFailure(有任何一个失败就结束) 和 ShutdownOnSuccess(由任何一个成功就结束) 支持这两种策略。

这里是一个使用 StructredTaskScope.ShutdownOnFailure 的例子,它运行一系列并发任务,并能够在任意一个失败时,整体以失败结束。

<T> List<T> runAll(List<Callable<T>> tasks) throws Throwable {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
List<Future<T>> futures = tasks.stream().map(scope::fork).toList();
scope.join();
scope.throwIfFailed(e -> e);  // Propagate exception as-is if any fork fails
// Here, all tasks have succeeded, so compose their results
return futures.stream().map(Future::resultNow).toList();
}
}

这里是一个使用 StructredTaskScope.ShutdownOnSuccess 的例子,它运行一系列并发任务,并在任意一个任务完成时候,就返回结果。并且使用了joinUntil来支持超时设置.

<T> T race(List<Callable<T>> tasks, Instant deadline) throws ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<T>()) {
for (var task : tasks) {
scope.fork(task);
}
scope.joinUntil(deadline);
return scope.result();  // Throws if none of the forks completed successfully
}
}

当任意一个子任务完成时,scope将会尽快的关闭其他仍然在运行的子任务。该方法会在所有子任务都失败或者超过了设定的deadline水岸。 这个模式非常有用,比如:服务端应用请求多个api,但只需要一个结果。

虽然这两个策略是现成提供的,开发者完全可以继承StructuredTaskScope 并重写 handleComplete(Future)方法来实现其他的真的要策略。

扇入(fan-in)场景

上文提到的例子都是并发向外部进行io操作的扇出(fan-out)场景(关于扇出、扇入更详细的解释可以看 这里). StructuredTaskScope对于接受并发请求的扇入场景一样有作用 在这样的场景,我们通常数量不明的forks来接受请求; 下面是一个socker server使用StructuredTaskScope来fork处理请求链接的例子

void serve(ServerSocket serverSocket) throws IOException, InterruptedException {
try (var scope = new StructuredTaskScope<Void>()) {
try {
while (true) {
var socket = serverSocket.accept();
scope.fork(() -> handle(socket));
}
} finally {
// If there's been an error or we're interrupted, we stop accepting
scope.shutdown();  // Close all active connections
scope.join();
}
}
}

因为所有的负责连接处理的线程都是在scpoe内创建的,所以 thread dump 可以展示出来它们都是scope 所有者的子节点。

可观测性(译者注:非常重要的特性)

我们基于 JEP 425 扩展了新的json 格式的 threaddump,它可以展示出 StructuredTaskScope 的线程组: $ jcmd Thread.dump_to_file -format=json

每一个scope的json对象都包含一个frok在当前scope的线程的数组,线程堆栈被汇聚到一起。 sopce的所有者线程将会阻塞等待子任务完成;通过结构化并发,thread dump可以轻松得将自线程展示在调用栈中; scope的json对象同样能展示对父对象的引用,所以程序的结构能够得以在dump中展现。

com.sun.management.HotSpotDiagnosticsMXBean api 同样可以生成thread dump,直接调用或者通过MBEAnServer 以及本地或者远程的JMX 工具都支持。

其他选择

什么也不做;继续使用底层的并发类,让开发人员更加小心的考虑并发中的异常情况和生命周期问题。 加强ExecutorService

依赖

JEP 425 虚拟线程