23 September 2023

虚拟线程在JDK19中以preview状态到来,经过JDK19和JDK20的测试,接收开发者的反馈和打磨,最终在JDK21(2023-09-19)中正式发布。JDK19中带来了两个在整个最酷的特征就是JEP444 虚拟线程和JEP428 结构化并发。虚拟线程的到来将使Java编程一门真正低成本进行并发程序开发的语言。

本文将以最完整的方式来介绍虚拟线程,使本文章成为虚拟线程领域最佳文章。

准备工作

我们使用slf4j记录日志以便观察虚拟线程的行为情况.

static final Logger logger = LoggerFactory.getLogger(App.class);

为了方便记录信息,我们在这里封装一个log方法,方法中主要的改造点的就是输出了当前线程。

static void log(String message) {
  logger.info("{} | " + message, Thread.currentThread());
}

为什么选择虚拟线程?

JVM 是一种多线程语言,众所周知,JVM通过java.util.Thread提供了对操作系统线程的抽象。在Project Loom(虚拟线程的项目名)发布之前,每一个java.util.Thread都是对操作系统线程的包装,它们是1:1的。将这种原始的线程称之为平台线程(Platform Thread).

使用操作系统线程的问题是会带来一系列昂贵的代价。首先, 创建线程是有成本的。每当创建一个平台线程,操作系统必须申请Mb(默认为1Mb)级别的内存来作为线程栈的存储空间,用来保存线程上下文,Java调用栈信息,native信息

可以很轻松想象到,多线程在时间和空间上都会带来成本。事实上,栈内存的大小会严重限制可创建内存的数量。在Java中,我们只要不断创建线程,就能耗尽操作系统内存,从而轻松触发OutOfMemoryError.

private static void stackOverFlowErrorExample() {
  for (int i = 0; i < 1_000_000; i++) {
    new Thread(() -> {
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
    }).start();
  }
}

结果取决于操作系统和硬件配置,在笔者的8Gb MBP m1 上,很快就能触发问题。

[0.949s][warning][os,thread] Failed to start thread "Unknown thread" - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 4k, detached.
[0.949s][warning][os,thread] Failed to start the native thread for java.lang.Thread "Thread-4073"
Exception in thread "main" java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached

Java 是一种追求简单的编程语言,在并发编程中,最简单的办法就是把他们写成类似同步的方式。目前并发任务最佳的方式就是为每一个任务开启一个线程(Thread-pre-request)。

在这种编程方式中,每个线程都可以使用线程的局部变量来保存信息,这使线程之间需要共享的状态减少,从而使并发编程变得简单。但是Thread-pre-request的模型,会使我们更容易到达thread数量的限制。Java中会搭配线程池来管理多线程编程中的线程,但是使用线程池会带来更多需要考虑的问题:排队延迟;线程数量设置;拒绝策略支持等,稍有不注意,一个参数配置错误,就会导致系统产生OOM,慢响应等问题。

另外一种避免多线程的方式是使用非阻塞IO,这种方式会带来大量的callback,从而形成回调地狱(callback hell),非常影响代码的可读性、可维护性,甚至是编写难度。

static void callbackHell() {
  a(aInput, resultFromA -> 
    b(resultFromA, resultFromB -> 
      c(resultFromB, resultFromC ->
        d(resultFromC, resultFromD ->
          System.out.printf("A, B, C, D: $resultFromA, $resultFromB, $resultFromC, $resultFromD")))));
}

为了解决回调地狱问题,响应式编程,async/await 关键字等解决方案被提出。

响应式编程试图使用 DSL 语言对数据进行流式处理并且实现并发。但是DSL十分难以理解,带来非常大的学习成本和心智负担。

public Observable<String> getSmsCode(String mobile) {
    return getService().getSmsCode(mobile, "GRAVIDA")
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .flatMap(new Func1<Response<String>, Observable<String>>() {
                @Override
                public Observable<String> call(Response<String> stringResponse) {
                    return flatResponse(stringResponse);
                }
            });
}

另外一门基于JVM的语言Kotlin,就使用了类似async/await的方案,kotlin的协程依赖 suspending 功能,suspending底层依赖非阻塞IO ,但并不是所有功能/库都提供了非阻塞IO。这还会导致整个程序被分割为两部分,一部分是同步的,另外一部分是suspending的.这是一件对代码维护性非常有挑战性的事。同样会使我们丢失程序的简单性。

基于上述这些原因,Java需要一个更好的方式来编写并发程序。虚拟线程就是目前最合适的方案,Java之父曾说过,虚拟线程将终结响应式编程。接下来我们详细学习虚拟线程。

创建虚拟线程

虚拟线程是一种新的线程,前面我们介绍过平台线程的概念,它会尝试解决线程的资源使用问题。作为java.util.Thread的替代品,虚拟线程使用堆(heap)保存栈帧数据而不是栈(stack)。

虚拟线程的内存占用非常小,只有几百bytes.并且此时的栈空间可以随时调整,不需要每个线程都申请大量的内存。

创建虚拟线程非常简单。java.util.Thread提供了新的工厂方法ofVirtual().下面的代码就使用该方法创建一个带有名字的虚拟线程.

private static Thread virtualThread(String name, Runnable runnable){
    return Thread.ofVirtual().name(name).start(runnable);
}

我们会使用一个例子来演示虚拟线程,比如我们去洗澡.

static Thread bathTime() {
  return virtualThread(
    "Bath time",
    () -> {
      log("I'm going to take a bath");
      sleep(Duration.ofMillis(500L));
      log("I'm done with the bath");
    });
}

然后喝一杯茶。

static Thread boilingWater() {
  return virtualThread(
    "Drink some tea",
    () -> {
      log("I'm going to drink some tea");
      sleep(Duration.ofSeconds(1L));
      log("I'm done with the tea");
    });
}

把任务组合到一起

static void concurrentMorningRoutine() {
  var bathTime = bathTime();
  var boilingWater = boilingWater();
  bathTime.join();
  boilingWater.join();
}

通过线程的join,可以避免在main方法在异步任务完成之前结束。我们查看输出

08:34:46.217 [boilWater] INFO in.rcard.virtual.threads.App - VirtualThread[#21,boilWater]/runnable@ForkJoinPool-1-worker-1 | I'm going to take a bath
08:34:46.218 [boilWater] INFO in.rcard.virtual.threads.App - VirtualThread[#23,boilWater]/runnable@ForkJoinPool-1-worker-2 | I'm going to boil some water
08:34:46.732 [bath-time] INFO in.rcard.virtual.threads.App - VirtualThread[#21,boilWater]/runnable@ForkJoinPool-1-worker-2 | I'm done with the bath
08:34:47.231 [boilWater] INFO in.rcard.virtual.threads.App - VirtualThread[#23,boilWater]/runnable@ForkJoinPool-1-worker-2 | I'm done with the water

输出结果符合我们的预期,两个线程并发交替执行。

这里使用了Thread来创建线程,工作中我们一般会更有限使用Executors的线程池来维护和管理线程。

static void concurrentMorningRoutineUsingExecutors() {
  try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
    var bathTime =
      executor.submit(
        () -> {
          log("I'm going to take a bath");
          sleep(Duration.ofMillis(500L));
          log("I'm done with the bath");
        });
    var boilingWater =
      executor.submit(
        () -> {
          log("I'm going to boil some water");
          sleep(Duration.ofSeconds(1L));
          log("I'm done with the water");
        });
    bathTime.get();
    boilingWater.get();
  }
}

提供线程名命名工厂

 final ThreadFactory factory = Thread.ofVirtual().name("routine-", 0).factory();
  try (var executor = Executors.newThreadPerTaskExecutor(factory)) {
    var bathTime =
      executor.submit(
        () -> {
          log("I'm going to take a bath");
          sleep(Duration.ofMillis(500L));
          log("I'm done with the bath");
         });
    var boilingWater =
      executor.submit(
        () -> {
          log("I'm going to boil some water");
          sleep(Duration.ofSeconds(1L));
          log("I'm done with the water");
        });
    bathTime.get();
    boilingWater.get();
  }

ThreadFactory 使用统一配置来创建线程,我们这里的用法中,线程拥有一个前缀routine-来命名线程,数字从0开始。

08:44:35.390 [routine-1] INFO in.rcard.virtual.threads.App - VirtualThread[#23,routine-1]/runnable@ForkJoinPool-1-worker-2 | I'm going to boil some water
08:44:35.390 [routine-0] INFO in.rcard.virtual.threads.App - VirtualThread[#21,routine-0]/runnable@ForkJoinPool-1-worker-1 | I'm going to take a bath
08:44:35.900 [routine-0] INFO in.rcard.virtual.threads.App - VirtualThread[#21,routine-0]/runnable@ForkJoinPool-1-worker-1 | I'm done with the bath
08:44:36.399 [routine-1] INFO in.rcard.virtual.threads.App - VirtualThread[#23,routine-1]/runnable@ForkJoinPool-1-worker-1 | I'm done with the water

现在我们都会创建虚拟线程了,接下来我们来看看虚拟线程是如何工作的。

虚拟线程如何工作

虚拟线程如何工作?下面的图表达了平台线程和虚拟线程之前的关系。

JVM会维护一个平台线程池,通过ForkJoinPool(此ForkJoinPool和之前系统中的ForkJoinPool不是一个)来创建和管理,这些用来支持虚拟线程的平台线程被称为载体线程(Carry Thread).默认情况下,载体线程数等于cpu核心数,并且不会超过256.

对于创建的每个虚拟线程,JVM负责调度它的执行,从空闲的载体线程中选择一个,从堆中临时复制虚拟线程的栈信息到载体线程的栈中。

我们可以从上面的日志中分析:

08:44:35.390 [routine-1] INFO in.rcard.virtual.threads.App - VirtualThread[#23,routine-1]/runnable@ForkJoinPool-1-worker-2 | I'm going to boil some water

我们先看|左边的部分,这其中保护了虚拟线程的id标识符VirtualThread[#23,routine-1],#23routine-1,其次是ForkJoinPool-1-worker-2说明该线程worker-2来自线程池ForkJoinPool-1中。

当虚拟线程第一次遇到阻塞操作时,载体线程被释放,线程上的栈信息被拷贝回堆中。这表明载体线程可以运行任务虚拟线程,当被阻塞的虚拟线程结束阻塞操作时,调度器会再度运行它。可能在同一个载体线程或者其他线程上。

使用api获取当前系统的cpu数量。在现代cpu中,往往使用超线程(Hyper-Thread)技术,会使cpu的逻辑核心=物理核心*2。Java api中获取的是逻辑核心数量。比如笔者使用的笔记本CPU为Intel G7400,cpu规格为2核心4线程(也叫做2c4t).

static int numberOfCores() {
  return Runtime.getRuntime().availableProcessors();
}

启动数量为cpu+1的虚拟线程

static void viewCarrierThreadPoolSize() {
  final ThreadFactory factory = Thread.ofVirtual().name("routine-", 0).factory();
  try (var executor = Executors.newThreadPerTaskExecutor(factory)) {
    IntStream.range(0, numberOfCores() + 1)
        .forEach(i -> executor.submit(() -> {
          log("Hello, I'm a virtual thread number " + i);
          sleep(Duration.ofSeconds(1L));
        }));
  }
}

我们期望启动5个虚拟线程,同时只有4个平台线程作为载体。

08:44:54.849 [routine-0] INFO in.rcard.virtual.threads.App - VirtualThread[#21,routine-0]/runnable@ForkJoinPool-1-worker-1 | Hello, I'm a virtual thread number 0
08:44:54.849 [routine-1] INFO in.rcard.virtual.threads.App - VirtualThread[#23,routine-1]/runnable@ForkJoinPool-1-worker-2 | Hello, I'm a virtual thread number 1
08:44:54.849 [routine-2] INFO in.rcard.virtual.threads.App - VirtualThread[#24,routine-2]/runnable@ForkJoinPool-1-worker-3 | Hello, I'm a virtual thread number 2
08:44:54.855 [routine-4] INFO in.rcard.virtual.threads.App - VirtualThread[#26,routine-4]/runnable@ForkJoinPool-1-worker-4 | Hello, I'm a virtual thread number 4
08:44:54.849 [routine-3] INFO in.rcard.virtual.threads.App - VirtualThread[#25,routine-3]/runnable@ForkJoinPool-1-worker-4 | Hello, I'm a virtual thread number 3

从结果上看,我们成功启动了5个虚拟线程,编号从routine-0到4,对应的平台线程只有4个,从ForkJoinPool-1-worker-1到-4,其中-4出现了2次,这也和我们上面的描述相匹配。

虚拟线程是如何进行调度的

虚拟线程内部使用 FIFO (先进先出) 队列和ForkJoinPool进行消费。java.util.VirtualThread中定义了默认的调度器。

final class VirtualThread extends BaseVirtualThread {
  private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
  
  // 忽略了其他代码
  
  private static ForkJoinPool createDefaultScheduler() {
    // 忽略了其他代码
    int parallelism, maxPoolSize, minRunnable;
    String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
    String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
    String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
    // 忽略了其他代码
    return new ForkJoinPool(parallelism, factory, handler, asyncMode,
        0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
  }
}

使用上述系统属性指定载体线程池大小。默认池大小等于cpu数量,值最大为256.

虚拟线程实现了协作调度,虚拟线程自身决定合适让出(yield)线程.更具体一点,该操作传递到调度器,然后虚拟线程就被从载体线程上卸载。

我们可以使用sleep方法和上述属性来验证这些观点。我们先定义一个方法,该方法会进入一个while的死循环中,无法执行到后续的sleep方法.

static Thread workingHard() {
  return virtualThread(
      "Working hard",
      () -> {
        log("I'm working hard");
        while (alwaysTrue()) {
          // Do nothing
        }
        sleep(Duration.ofMillis(100L));
        log("I'm done with working hard");
      });
}

在这个代码中,定义了一个alwaysTure方法来绕过编译器过死代码(dead code)的检查,该方法永远返回true.

然后我们再定义一个方法来让员工进行休息

static Thread takeABreak() {
  return virtualThread(
      "Take a break",
      () -> {
        log("I'm going to take a break");
        sleep(Duration.ofSeconds(1L));
        log("I'm done with the break");
      });
}

将两部分功能组合起来。

static void workingHardRoutine() {
  var workingHard = workingHard();
  var takeABreak = takeABreak();
  workingHard.join();
  takeABreak.join();
}

在运行代码之前,我们先设置一下参数.

-Djdk.virtualThreadScheduler.parallelism=1
-Djdk.virtualThreadScheduler.maxPoolSize=1
-Djdk.virtualThreadScheduler.minRunnable=1

以上参数的设置效果是,载体线程池只有1个线程。并且workHard任务会死循环占用线程,永远不会让出线程。

21:28:35.702 [Working hard] INFO in.rcard.virtual.threads.App - VirtualThread[#21,Working hard]/runnable@ForkJoinPool-1-worker-1 | I'm working hard
--- hang住,无其他输出 ---

结果显示workHard无限占用线程,takeABreak虚拟线程永远无法执行。

现在改动为每次循环休眠100ms,从而使协作调度能够发生。workHard现在会让出线程。

static Thread workingConsciousness() {
  return virtualThread(
      "Working consciousness,
      () -> {
        log("I'm working hard");
        while (alwaysTrue()) {
          sleep(Duration.ofMillis(100L));
        }
        log("I'm done with working hard");
      });
}

现在workingConsciousness方法中有阻塞操作sleep,能够让出线程。将两部分代码合并到一起,从而方便验证。

static void workingConsciousnessRoutine() {
  var workingConsciousness = workingConsciousness();
  var takeABreak = takeABreak();
  workingConsciousness.join();
  takeABreak.join();
}

我们预测takeABreak虚拟线程将在workingConsciousness执行到阻塞操作后运行。结果上也表明了这一点。

21:30:51.677 [Working consciousness] INFO in.rcard.virtual.threads.App - VirtualThread[#21,Working consciousness]/runnable@ForkJoinPool-1-worker-1 | I'm working hard
21:30:51.682 [Take a break] INFO in.rcard.virtual.threads.App - VirtualThread[#23,Take a break]/runnable@ForkJoinPool-1-worker-1 | I'm going to take a break
21:30:52.688 [Take a break] INFO in.rcard.virtual.threads.App - VirtualThread[#23,Take a break]/runnable@ForkJoinPool-1-worker-1 | I'm done with the break
--- hang住,永远运行 ---

和预料中一样,两个虚拟线程(#21和#23)共享了载体线程worker-1

现在我们回到workHard方法中,同时将载体线程池设置为2,此时我们可以看到两个虚拟线程同时运行,并且使用了不同的载体线程。

-Djdk.virtualThreadScheduler.parallelism=2
-Djdk.virtualThreadScheduler.maxPoolSize=2
-Djdk.virtualThreadScheduler.minRunnable=2

我们预料中的是,两个虚拟线程会同时执行并且使用worker-1worker-2载体线程.

21:33:43.641 [Working hard] INFO in.rcard.virtual.threads.App - VirtualThread[#21,Working hard]/runnable@ForkJoinPool-1-worker-1 | I'm working hard
21:33:43.641 [Take a break] INFO in.rcard.virtual.threads.App - VirtualThread[#24,Take a break]/runnable@ForkJoinPool-1-worker-2 | I'm going to take a break
21:33:44.655 [Take a break] INFO in.rcard.virtual.threads.App - VirtualThread[#24,Take a break]/runnable@ForkJoinPool-1-worker-2 | I'm done with the break
--- hang住,永远执行 ---

值得注意的是,只有在能够高度协作的环境下,线程协作才有意义。由于虚拟线程只有才阻塞操作时才会让出线程,所以协作调度和虚拟线程不会提高cpu密集型应用的性能。

虚拟线程挂起现象

前面提到过,虚拟线程运行在载体线程上,当产生阻塞(block)操作时,会自动让出载体线程。当虚拟线程阻塞结束时,再由调度器选择新的载体线程恢复执行。

可惜的是,到目前为止,存在一部分阻塞操作无法从载体线程上让出,导致载体线程也阻塞。这种情况被称为挂起(Pinned).这不会导致程序错误,但是会影响程序的扩展性和并发性。如果载体线程被挂起,并且载体线程池允许的话,JVM将会自动添加一个载体线程。

有两种明确的原因将导致挂起:

  1. 代码执行到synchronized代码块或者方法。或者调用了Object.wait()
  2. 调用了native方法或者foreign 方法。比如使用JNI调用了native库

接下来我们构造一个相应的例子,模拟只有一个厕所,所以上厕所方法添加了synchronized关键字。

static class Bathroom {
  synchronized void useTheToilet() {
    log("I'm going to use the toilet");
    sleep(Duration.ofSeconds(1L));
    log("I'm done with the toilet");
  }
}

模拟一个员工使用卫生间

static Bathroom bathroom = new Bathroom();

static Thread goToTheToilet() {
  return virtualThread(
      "Go to the toilet",
      () -> bathroom.useTheToilet());
}

再模拟出两个员工kwn和smy,kwn使用厕所,smy等待休息。

static void twoEmployeesInTheOffice() {
  var kwn = goToTheToilet();
  var smy = takeABreak();
  kwn.join();
  smy.join();
}

为了看到占用载体线程的效果,这里将载体线程池大小设置为1。然后运行该程序。输出如下

16:29:05.548 [Go to the toilet] INFO in.rcard.virtual.threads.App - VirtualThread[#21,Go to the toilet]/runnable@ForkJoinPool-1-worker-1 | I'm going to use the toilet
16:29:06.558 [Go to the toilet] INFO in.rcard.virtual.threads.App - VirtualThread[#21,Go to the toilet]/runnable@ForkJoinPool-1-worker-1 | I'm done with the toilet
16:29:06.559 [Take a break] INFO in.rcard.virtual.threads.App - VirtualThread[#23,Take a break]/runnable@ForkJoinPool-1-worker-1 | I'm going to take a break
16:29:07.563 [Take a break] INFO in.rcard.virtual.threads.App - VirtualThread[#23,Take a break]/runnable@ForkJoinPool-1-worker-1 | I'm done with the break

从结果上可以看到,被synchronized修饰的Go to the toilet占用了线程,直到执行完成后,第二个任务才开始运行。时间刚好相差1s。

通过-Djdk.tracePinnedThreads=full/short参数可以设置JVM跟踪挂起的线程。

full参数会展示展示挂起虚拟线程的完全stack trace,而short只会展示少量的信息。下面就是用short所展示的挂起虚拟线程的信息。

16:29:05.548 [Go to the toilet] INFO in.rcard.virtual.threads.App - VirtualThread[#21,Go to the toilet]/runnable@ForkJoinPool-1-worker-1 | I'm going to use the toilet
Thread[#22,ForkJoinPool-1-worker-1,5,CarrierThreads]
    virtual.threads.playground/in.rcard.virtual.threads.App$Bathroom.useTheToilet(App.java:188) <== monitors:1
16:29:06.558 [Go to the toilet] INFO in.rcard.virtual.threads.App - VirtualThread[#21,Go to the toilet]/runnable@ForkJoinPool-1-worker-1 | I'm done with the toilet
16:29:06.559 [Take a break] INFO in.rcard.virtual.threads.App - VirtualThread[#23,Take a break]/runnable@ForkJoinPool-1-worker-1 | I'm going to take a break
16:29:07.563 [Take a break] INFO in.rcard.virtual.threads.App - VirtualThread[#23,Take a break]/runnable@ForkJoinPool-1-worker-1 | I'm done with the break

可以配置载体线程池大小从而使JVM能够在载体线程不足时创建新的线程.

-Djdk.virtualThreadScheduler.parallelism=1
-Djdk.virtualThreadScheduler.maxPoolSize=2
-Djdk.virtualThreadScheduler.minRunnable=1

使用新的配置执行程序。

16:32:05.235 [Go to the toilet] INFO in.rcard.virtual.threads.App - VirtualThread[#21,Go to the toilet]/runnable@ForkJoinPool-1-worker-1 | I'm going to use the toilet
16:32:05.235 [Take a break] INFO in.rcard.virtual.threads.App - VirtualThread[#23,Take a break]/runnable@ForkJoinPool-1-worker-2 | I'm going to take a break
16:32:06.243 [Go to the toilet] INFO in.rcard.virtual.threads.App - VirtualThread[#21,Go to the toilet]/runnable@ForkJoinPool-1-worker-1 | I'm done with the toilet
16:32:06.243 [Take a break] INFO in.rcard.virtual.threads.App - VirtualThread[#23,Take a break]/runnable@ForkJoinPool-1-worker-2 | I'm done with the break

JVM 在无法找空闲载体线程时自动添加了新的载体线程worker-2

使用java.util.concurrent.locks.ReentrantLock中的Lock API替换synchronized 可以解决该问题。

static class Bathroom {
  private final Lock lock = new ReentrantLock();
  
  @SneakyThrows
  void useTheToiletWithLock() {
    if (lock.tryLock(10, TimeUnit.SECONDS)) {
      try {
        log("I'm going to use the toilet");
        sleep(Duration.ofSeconds(1L));
        log("I'm done with the toilet");
      } finally {
        lock.unlock();
      }
    }
  }
}

使用新版代码

static Thread goToTheToiletWithLock() {
  return virtualThread("Go to the toilet", () -> bathroom.useTheToiletWithLock());
}

@SneakyThrows
static void twoEmployeesInTheOfficeWithLock() {
  var riccardo = goToTheToiletWithLock();
  var daniel = takeABreak();
  riccardo.join();
  daniel.join();
}

最终结果展示两个虚拟线程并发运行。

16:35:58.921 [Take a break] INFO in.rcard.virtual.threads.App - VirtualThread[#23,Take a break]/runnable@ForkJoinPool-1-worker-2 | I'm going to take a break
16:35:58.921 [Go to the toilet] INFO in.rcard.virtual.threads.App - VirtualThread[#21,Go to the toilet]/runnable@ForkJoinPool-1-worker-1 | I'm going to use the toilet
16:35:59.932 [Take a break] INFO in.rcard.virtual.threads.App - VirtualThread[#23,Take a break]/runnable@ForkJoinPool-1-worker-1 | I'm done with the break
16:35:59.933 [Go to the toilet] INFO in.rcard.virtual.threads.App - VirtualThread[#21,Go to the toilet]/runnable@ForkJoinPool-1-worker-2 | I'm done with the toilet

可以同样添加-Djdk.tracePinnedThreads=full来查看是否存在PinnedThread情况。

ThreadLocal 和 线程池

虚拟线程曾经不支持ThreadLocal,但是在开发者的反馈下,最后JDK21发布时,支持了ThreadLocal。我们一起来看看在虚拟线程中如何使用ThreadLocal.

static ThreadLocal<String> context = new ThreadLocal<>();

static void virtualThreadLocal() {
  var virtualThread1 = Thread.ofVirtual().name("thread-1").start(() -> {
    context.set("thread-1");
    sleep(Duration.ofSeconds(1L));
    log("Hey, my name is " + context.get());
  });
  var virtualThread2 = Thread.ofVirtual().name("thread-2").start(() -> {
    context.set("thread-2");
    sleep(Duration.ofSeconds(1L));
    log("Hey, my name is " + context.get());
  });
  virtualThread1.join();
  virtualThread2.join();
}

输出为

15:08:37.142 [thread-1] INFO in.rcard.virtual.threads.App - VirtualThread[#21,thread-1]/runnable@ForkJoinPool-1-worker-1 | Hey, my name is thread-1
15:08:37.142 [thread-2] INFO in.rcard.virtual.threads.App - VirtualThread[#23,thread-2]/runnable@ForkJoinPool-1-worker-2 | Hey, my name is thread-2

和平台线程类似,使用方式一样。

但是这不代表在虚拟线程场景中使用ThreadLocal是一个很好的想法,相反,反而需要注意这种情况。原因在于现在可以轻易创建大量的虚拟线程,同时每个虚拟线程都拥有自己的ThreadLocal,这就意味着,它们占用的内存可能会意想不到的大。

未来可能会有其他方案能在虚拟线程场景代替ThreadLocal,JDK20中的特征scoped value将允许在线程中和跨线程共享不可变变量。

虚拟线程内部实现

本章节中,我们将介绍虚拟线程的原理,为什么能做到线程切换等等。

本章节不会特别深入,但是会讲解到基本的概念。

Project Loom 提供了新的API Continuation来进行线程的让出和恢复。虚拟线程存储了自身运行的必要信息。

虚拟线程被JVM天然地支持,Continuation的执行就是一堆对于JVM的native调用。

VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
    super(name, characteristics, /*bound*/ false);
    Objects.requireNonNull(task);
    // choose scheduler if not specified
    if (scheduler == null) {
        Thread parent = Thread.currentThread();
        if (parent instanceof VirtualThread vparent) {
            scheduler = vparent.scheduler;
        } else {
            scheduler = DEFAULT_SCHEDULER;
        }
    }
    this.scheduler = scheduler;
    this.cont = new VThreadContinuation(this, task);
    this.runContinuation = this::runContinuation;
}

之后一个Continuation对象被创建,VThreadContinuation.

private static class VThreadContinuation extends Continuation {
  VThreadContinuation(VirtualThread vthread, Runnable task) {
    super(VTHREAD_SCOPE, () -> vthread.run(task));
  }
  @Override
  protected void onPinned(Continuation.Pinned reason) {
    if (TRACE_PINNING_MODE > 0) {
      boolean printAll = (TRACE_PINNING_MODE == 1);
      PinnedThreadPrinter.printStackTrace(System.out, printAll);
    }
  }
}

上面一段代码同样展示了jdk.tracePinnedThreads参数是如何工作的。

一旦调用start方法,状态就变更为started.

@Override
void start(ThreadContainer container) {
    if (!compareAndSetState(NEW, STARTED)) {
        throw new IllegalThreadStateException("Already started");
    }
    // Omissis
    try {
        // Omissis
        // submit task to run thread
        submitRunContinuation();
        started = true;
    } finally {
        // Omissis
    }
}

submitRunContinuation 方法将runContinuation提交给调度器进行调度。

private void submitRunContinuation(boolean lazySubmit) {
    try {
        if (lazySubmit && scheduler instanceof ForkJoinPool pool) {
            pool.lazySubmit(ForkJoinTask.adapt(runContinuation));
        } else {
            scheduler.execute(runContinuation);
        }
    } catch (RejectedExecutionException ree) {
        // Omissis
    }
}

runContinuation 方法的执行会使状态变为running,不管之前是STARTED或者RUNNABLE.

private void runContinuation() {
    // Omissis
    if (initialState == STARTED && compareAndSetState(STARTED, RUNNING)) {
        // first run
        firstRun = true;
    } else if (initialState == RUNNABLE && compareAndSetState(RUNNABLE, RUNNING)) {
        // consume parking permit
        setParkPermit(false);
        firstRun = false;
    } else {
        // not runnable
        return;
    }
    // Omissis
    try {
        cont.run();
    } finally {
        // Omissis
    }
}   

每一次线程碰到阻塞点,线程的状态都变为PRAKING。到达阻塞点是通过调用VirtualThread.park()方法。

void park() {
    assert Thread.currentThread() == this;
    // complete immediately if parking permit available or interrupted
    if (getAndSetParkPermit(false) || interrupted)
        return;
    // park the thread
    setState(PARKING);
    try {
        if (!yieldContinuation()) {
            // park on the carrier thread when pinned
            parkOnCarrierThread(false, 0);
        }
    } finally {
        assert (Thread.currentThread() == this) && (state() == RUNNING);
    }
}

一旦进入 PARKING状态,yieldContinuation方法被调用,该方法会执行虚拟线程实际的Parking操作以及从载体线程上卸载(unmount).

private boolean yieldContinuation() {
    boolean notifyJvmti = notifyJvmtiEvents;
    // unmount
    if (notifyJvmti) notifyJvmtiUnmountBegin(false);
    unmount();
    try {
        return Continuation.yield(VTHREAD_SCOPE);
    } finally {
        // re-mount
        mount();
        if (notifyJvmti) notifyJvmtiMountEnd(false);
    } 
}

Continuation.yield(VTHREAD_SCOPE)方法调用很多JVM的native方法。如果该方法返回true, parkOnCarrierThread方法将被调用.

private void parkOnCarrierThread(boolean timed, long nanos) {
    assert state() == PARKING;
    var pinnedEvent = new VirtualThreadPinnedEvent();
    pinnedEvent.begin();
    setState(PINNED);
    try {
        if (!parkPermit) {
            if (!timed) {
                U.park(false, 0);
            } else if (nanos > 0) {
                U.park(false, nanos);
            }
        }
    } finally {
        setState(RUNNING);
    }
    // consume parking permit
    setParkPermit(false);
    pinnedEvent.commit();
}

最后,VirtualThread.afterYield()方法被调用。该方法设置虚拟线程状态为PARKED,通过执行lazySubmitRunContinuation方法再次继续执行,并设置状态为RUNNABLE.

private void afterYield() {
    int s = state();
    assert (s == PARKING || s == YIELDING) && (carrierThread == null);
    if (s == PARKING) {
        setState(PARKED);
        // notify JVMTI that unmount has completed, thread is parked
        if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(false);
        // may have been unparked while parking
        if (parkPermit && compareAndSetState(PARKED, RUNNABLE)) {
            // lazy submit to continue on the current thread as carrier if possible
            lazySubmitRunContinuation();
        }
    } else if (s == YIELDING) {   // Thread.yield
        setState(RUNNABLE);
        // notify JVMTI that unmount has completed, thread is runnable
        if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(false);
        // lazy submit to continue on the current thread as carrier if possible
        lazySubmitRunContinuation();
    }
}

到这里最终形成了闭环,从整个过程中会发现,虚拟线程的状态机变更机制还是挺负责的。

结论

终于写完了这个2w字的文章,本文从开始介绍虚拟线程诞生的原因,然后介绍如何创建虚拟线程,通过案例展示了虚拟线程的切换原理,挂起行为的产生,规避的方法。

随着虚拟线程生态的完善,java将会称为并发编程的搅局者。

希望本文能使读者完善对虚拟线程的认知,帮助大家在以低成本方式进行并发代码编程。

附录:代码

代码在github:virtualThreadGuideDemo中可访问