Uploaded image for project: 'JDK'
  1. JDK
  2. JDK-8277129

JEP 428: Structured Concurrency (Incubator)

    XMLWordPrintable

    Details

    • Author:
      Alan Bateman, Ron Pressler
    • JEP Type:
      Feature
    • Exposure:
      Open
    • Scope:
      JDK
    • Discussion:
      loom dash dev at openjdk dot java dot net
    • JEP Number:
      428

      Description

      Summary

      Simplify multithreaded programming by introducing a library for structured concurrency. Structured concurrency treats multiple tasks running in different threads as a single unit of work, thereby streamlining error handling and cancellation, improving reliability, and enhancing observability. This is an incubating API.

      Goals

      • Improve the reliability and observability of multithreaded code.

      • Promote a concurrent programming style that can eliminate common risks arising from cancellation and shutdown, such as thread leaks and cancellation delays.

      Non-Goals

      • It is not a goal of this JEP to replace concurrency constructs in java.util.concurrent, such as ExecutorService and Future.

      • It is not a goal of this JEP to provide a definitive structured concurrency API for Java. Other structured concurrency constructs can be added by third-party libraries or in future JDK releases.

      • It is not a goal of this JEP to propose a mechanism for sharing streams of data among threads ("channels"). This might be addressed by a future JEP.

      • It is not a goal of this JEP to replace the existing thread interruption mechanism with a new thread cancellation mechanism. This might be addressed by a future JEP.

      Motivation

      Java developers manage complexity by breaking down a task into multiple subtasks. In ordinary single-threaded code, the subtasks execute sequentially. However, if the subtasks are sufficiently independent of each other, and if there are sufficient hardware resources, then the task can be made to run faster (lower latency) by executing the subtasks concurrently. For example, a task that composes the results of multiple I/O operations will run faster if each I/O operation executes in its own thread. The availability of virtual threads makes it cost-effective to dedicate a thread to each I/O operation.

      Unstructured concurrency with ExecutorService

      Developers have traditionally used <code class="prettyprint" data-shared-secret="1656732681017-0.471430780674409">java.util.concurrent.ExecutorService</code>, introduced in Java 5, to execute subtasks concurrently.

      Here is a method, handle, that represents a task in a server application. It handles an incoming request by submitting two subtasks to an ExecutorService, es. One subtask executes the method findUser and the other subtask executes the method fetchOrder. The ExecutorService immediately returns a <code class="prettyprint" data-shared-secret="1656732681017-0.471430780674409">Future</code> for each subtask, and executes each subtask in its own thread. handle awaits their results via blocking calls to <code class="prettyprint" data-shared-secret="1656732681017-0.471430780674409">Future.get</code> – the task is said to join its subtasks.

      Response handle() throws ExecutionException, InterruptedException {
          Future<String>  user  = es.submit(() -> findUser());
          Future<Integer> order = es.submit(() -> fetchOrder());
          String theUser  = user.get();   // Join findUser 
          int    theOrder = order.get();  // Join fetchOrder
          return new Response(theUser, theOrder);
      }

      Because the subtasks execute concurrently, each can succeed or fail independently. (Failure means to throw an exception.) Often, a task like handle should fail if any of its subtasks fail. Understanding the lifetimes of the threads can be surprisingly complicated when failure occurs:

      • If findUser throws, then handle will throw when calling user.get(). fetchOrder will continue to run in its own thread even after handle has failed. This is a thread leak, which, at best, wastes resources; at worst, fetchOrder will interfere with other tasks.
      • If the thread executing handle is interrupted, the interruption is not propagated to subtasks. Both the findUser and fetchOrder threads will leak, continuing to run even after handle has failed.
      • If findUser takes a long time to execute, but fetchOrder fails in the meantime, then handle will wait unnecessarily for findUser by blocking on user.get() rather than canceling it. Only after findUser completes and user.get() returns will order.get() throw, causing handle to fail.

      In each case, the problem is that our program is logically structured with a task-subtask relationship, but this relationship exists only in the programmer's mind. This not only creates more room for error, but it makes diagnosing and troubleshooting such errors more difficult because observability tools, such as thread dumps, will show handle, findUser, and fetchOrder on the call stacks of unrelated threads, with no hint of the task-subtask relationship.

      We might attempt to do better by explicitly cancelling other subtasks when an error is detected, such as by wrapping tasks with try-finally and invoking the Future.cancel method on the other tasks in the catch block for the failing task. We'd also need to use the ExecutorService in a try-with-resources statement (as shown in JEP 425, the second example), because Future does not offer a way to wait for a task that's been cancelled. But this work can be very tricky to get right, and is often makes the logical intent of the code harder to discern. Keeping track of the inter-task relationship, and manually adding back the required inter-task cancellation edges, is asking a lot of developers.

      The need to manually coordinate lifetimes comes from the fact that ExecutorService and Future allow unrestricted patterns of concurrency, without any constraints or ordering on the threads involved. One thread can create an ExecutorService, a second thread can submit work to it, and the threads which execute the work have no relationship to either the first or second thread. Moreover, after a thread has submitted work, a completely different thread can await the results of execution. Any code with a reference to a Future can join it (i.e., await its result by calling get()), even code in a thread other than the one which obtained the Future. In effect, a subtask started by one task does not have to "return" to the task that submitted it; it could "return" to any number of tasks, or even none.

      Because ExecutorService and Future allow for such "unstructured" use, they do not enforce or even understand any relationships among tasks and subtasks, even though such relationships are quite common and useful. Accordingly, even when subtasks are submitted and joined in the same task, the failure of one subtask cannot automatically cause cancellation of another; in handle, the failure of fetchOrder cannot automatically cause cancellation of findUser. The Future for fetchOrder is unaware of the sibling Future for findUser, and neither knows which thread will join it via Future.get. We want to ensure that such cancellation can be reliably automated, rather than asking developers to manage it manually.

      Task structure should reflect code structure

      In contrast to the freewheeling assortment of threads under ExecutorService, the execution of single-threaded code always enforces a hierarchy of tasks and subtasks. The {...} block of a method corresponds to a task, and the methods invoked within the block correspond to subtasks. An invoked method must return (or throw) to the method that invoked it; it cannot outlive the method that invoked it, nor return or throw to a different method. Thus, all subtasks finish before the task, and the lifetime of each subtask relative to each other and to the task is governed by the syntactic block structure of the code.

      For example, in the single-threaded version of handle below, the task-subtask relationship is apparent from the syntactic structure:

      Response handle() throws IOException {
          String theUser  = findUser();
          int    theOrder = fetchOrder();
          return new Response(theUser, theOrder);
      }

      We don't start the fetchOrder subtask until the findUser subtask has completed, whether successfully or unsuccessfully. If findUser fails, we don't start fetchOrder at all, and the handle task fails implicitly. The fact that a subtask can only return to its parent is extremely significant. It means that the parent task can implicitly treat the failure of one subtask as a trigger to "cancel" all remaining subtasks as well as abort the parent task.

      When the task-subtask hierarchy is defined by the call stack, we get the parent-child relationship, which flows into error-propagation, for free. Moreover, this relationship is reified in the call stack at runtime. When observing a single thread, the hierarchical relationship is obvious: findUser (and later fetchOrder) appear subordinate to handle.

      Multithreaded programming would be considerably more reliable and observable if the parent-child relationship between a task and a subtask were reified. This would allow a child to report a result or exception only to its parent — the unique task that owns all the subtasks — which, then, could implicitly cancel the remaining subtasks.

      The following properties, which we have in sequential code, would give us similar benefits for concurrent code:

      1. In source code, a syntactic block structure that delineates and enforces the lifetimes of operations; such structure, in turn, would impose,
      2. At run time, a representation of the inter-thread hierarchy that is analogous to the intra-thread call stack, so as to support error propagation and cancellation for reliability, and to allow observation of the concurrent program in a meaningful way.

      (The JDK already has mechanisms that impose structure on concurrent tasks, such as java.util.concurrent.ForkJoinPool, the execution engine behind parallel streams. However, that mechanism is designed for compute-intensive tasks rather than tasks which involve I/O.)

      Structured Concurrency for multithreaded code

      Structured Concurrency is an approach to multithreaded programming that preserves the readability and maintainability enjoyed by developers of single-threaded code. It is the principle that if a task splits into concurrent subtasks, they all return to the same place: the task's code block.

      The term "structured concurrency" was coined by Martin Sústrik and popularized by Nathaniel J. Smith. Ideas from other languages, such as Erlang's hierarchical supervisors, inform the design of error handling in structured concurrency.

      By "returning" to the same code block, the lifetime of a concurrent subtask is confined to a syntactic block. Because the lifetime of all sibling subtasks are confined to the same block, they can be reasoned-about and managed as a unit; because that block is nested in that of the parent task, it induces a hierarchy that can be reified in a manner similar to the call stack. Subtasks work on behalf of a task — code in the enclosing block — that awaits their results and monitors them for failures. As with structured programming techniques for code in a single thread, the power of structured concurrency for multiple threads comes from two ideas: (1) well-defined entry and exit points for the flow of execution through a block of code, and (2) a strict nesting of the lifetime of operations in a way that mirrors their nesting in the code.

      At run time, structured concurrency builds a tree-shaped hierarchy of tasks, with sibling subtasks being owned by the same parent task; the tree is the concurrent counterpart to the call stack of a single thread, and observability tools use it to present subtasks as subordinate to their parent tasks.

      Structured concurrency is a great match for virtual threads. Virtual threads are a lightweight implementation of threads provided by the JDK. Many virtual threads share the same OS thread, allowing for very large numbers of virtual threads. In addition to being plentiful, virtual threads are cheap enough to represent any concurrent unit of behavior, even behavior that involves I/O. This means that a server application could use structured concurrency to process thousands or millions of incoming requests at once: it would dedicate a new virtual thread to the task of handling a request, and when the task "fans out" by submitting subtasks for concurrent execution, it would dedicate a new virtual thread to each subtask. Behind the scenes, the task-subtask relationship would be reified by outfitting each virtual thread with its unique owner, so it knows its place, similar to how a frame in the call stack knows its unique caller.

      In summary, virtual threads deliver an abundance of threads, and structured concurrency ensures they are correctly and robustly coordinated. Observability tools will see threads organized in the logical manner intended by the developer. Having a library for structured concurrency in the JDK offers maintainability and reliability to all developers of server applications.

      Description

      The class <code class="prettyprint" data-shared-secret="1656732681017-0.471430780674409">StructuredTaskScope</code> allows developers to structure a task as a family of concurrent subtasks, and to coordinate them as a unit. Subtasks are created in their own threads by forking them individually, but are then joined as a unit and possibly cancelled as a unit; their exceptions or successful results are aggregated and handled by the parent task. It confines the lifetimes of the subtasks, or forks, to a clear lexical scope where all of a task's interactions with its subtasks — forking, joining, cancelling, handling errors, and composing results — takes place.

      Here is the handle example from earlier, written to use StructuredTaskScope (the meaning of ShutdownOnFailure is explained below):

      Response handle() throws ExecutionException, InterruptedException {
          try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
              Future<String>  user  = scope.fork(() -> findUser()); 
              Future<Integer> order = scope.fork(() -> fetchOrder());
      
              scope.join();          // Join both forks
              scope.throwIfFailed(); // ... and propagate errors
      
              // Here, both forks have succeeded, so compose their results
              return new Response(user.resultNow(), order.resultNow());
          }
      }

      In contrast to the original example, understanding the lifetimes of the threads involved is easy: under all conditions, the lifetimes are confined to a lexical scope: body of the try-with-resources. Furthermore, this code gets a number of desirable properties for free:

      1. Error handling with short-circuiting: If either findUser or fetchOrder fail, the other will be cancelled if it hasn't yet completed (this is managed by the cancellation policy implemented by ShutdownOnFailure; other policies are possible too).

      2. Cancellation Propagation: If the thread running handle is interrupted before or during the call to join, both forks will be automatically cancelled when the scope is exited.

      3. Clarity: The above code has a clear structure: set up the child subtasks, wait for them (either to complete or to be canceled), and then decide whether to succeed (and process the results of the child tasks, which are already finished) or fail (and the subtasks are already finished, so there's nothing more to clean up.)

      4. Observability: A thread dump, as described below, will clearly demonstrate the task hierarchy, with the threads running findUser and fetchOrder shown as children of the scope.

      Like ExecutorService.submit, StructuredTaskScope.fork takes a Callable and returns a Future. Unlike ExecutorService, the returned Future is not joined via Future.get. The use of StructuredTaskScope encourages the joining or cancelling of all forks as a single unit, obviating the need for Future.get or Future.cancel. New Future methods, <code class="prettyprint" data-shared-secret="1656732681017-0.471430780674409">resultNow</code> and <code class="prettyprint" data-shared-secret="1656732681017-0.471430780674409">exceptionNow</code> are designed to be used after it is known that the task is complete, such as when following a StructuredTaskScope.join.

      Using StructuredTaskScope

      The general workflow of code using StructuredTaskScope to structure a task is as follows:

      1. Create a scope. The thread that creates the scope is its owner.

      2. Fork concurrent subtasks in the scope.

      3. Any of the forks in the scope, or the scope's owner, may call <code class="prettyprint" data-shared-secret="1656732681017-0.471430780674409">shutdown</code> to request cancelation of all remaining subtasks.

      4. The scope's owner joins the scope — i.e. all of its forks — as a unit. <code class="prettyprint" data-shared-secret="1656732681017-0.471430780674409">join</code> is a blocking call; it will return when all forks have either completed (successfully or not) or been cancelled with shutdown (see below). As an alternative, <code class="prettyprint" data-shared-secret="1656732681017-0.471430780674409">joinUntil</code> accepts a deadline.

      5. After joining, handle any errors in the forks and process their results (more examples follow).

      6. Close the scope, usually implicitly thanks to try-with-resources. This shuts down the scope and waits for any straggler forks to complete.

      If the owner is already a member of an existing scope (i.e. created as a fork in one), then that scope effectively becomes the parent of the new scope; tasks form a tree, with scopes as the intermediate nodes and threads as the leaves.

      Every fork runs in its own newly created thread, which by default is a virtual thread. The forks' threads are owned by the scope, which in turn is owned by its creating thread, thus forming a hierarchy. Any fork can create its own nested StructuredTaskScope to fork its own subtasks, thus extending the hierarchy. That hierarchy is reflected in the code's block structure, which confines the lifetimes of the forks: all of the forks' threads are guaranteed to have terminated once the scope has closed, and no thread is left behind when the block exits.

      Any fork in a scope, their own transitive forks, or the scope's owner, can call StructuredTaskScope.shutdown at any time to signify that the task is complete, even while other forks are still running. shutdown causes the threads of all forks that are still active in the scope to be interrupted; all forks should, therefore, be written in a way that is responsive to interruption. In effect, shutdown is the concurrent analog of the break statement in sequential code.

      When join returns, all forks are known to have either completed (successfully or not) or been cancelled. Their result or exception can be obtained, without any additional blocking, using the <code class="prettyprint" data-shared-secret="1656732681017-0.471430780674409">Future.resultNow</code> or <code class="prettyprint" data-shared-secret="1656732681017-0.471430780674409">Future.exceptionNow</code> methods which have been added to Future. (These methods throw an IllegalStateException if called before the Future completes.)

      Calling join or joinUntil within a scope is mandatory. If the block exits before the call to join, then the implicit closing of the scope will wait for all forks to terminate, and then throw an exception.

      It is possible for the owner thread to be interrupted before or while joining — for example, if it is a fork of an enclosing StructuredTaskScope that's been shut down. If this occurs, join or joinUntil will throw an exception because there is no point in continuing – any results obtained by forks so far are insufficient, or the scope would have already been shut down. The try-with-resources statement will then shut down the scope, cancelling all forks (and waiting for them to terminate). This has the effect of automatically propagating the cancellation of the task to its subtasks. If the joinUntil deadline expires before the forks terminate or shutdown is called, then joinUntil will throw an exception; again, the scope will be shut down and the forks automatically cancelled.

      The structured use of StructuredTaskScope is enforced at runtime. For example, attempts to call fork from a thread that is not in the tree hierarchy of the scope — i.e., the owner, the forks, and their own forks in nested StructuredTaskScopes — will fail with an exception. Code that uses StructuredTaskScope outside a try-with-resources block and returns without calling close() or does not maintain proper nesting of close() calls may experience StructureViolationExceptions thrown from StructuredTaskScope methods.

      Because StructuredTaskScope enforces a proper structure and order on operations, it does not implement the ExecutorService or Executor interfaces, as they are commonly used in a non-structured way (see the Alternatives section). However, it is straightforward to migrate code that uses ExecutorService, but would benefit from structure, to use StructuredTaskScope.

      StructuredTaskScope resides in an incubator module, excluded by default

      The examples above use the StructuredTaskScope API, so to run them on JDK XX you must add the jdk.incubator.concurrent module, as well as enable preview features to use virtual threads as follows:

      • Compile the program with javac --release XX --enable-preview --add-modules jdk.incubator.concurrent Main.java and run it with java --enable-preview --add-modules jdk.incubator.concurrent Main; or,

      • When using the source code launcher, run the program with java --source XX --enable-preview --add-modules jdk.incubator.concurrent Main.java; or,

      • When using jshell, start it with jshell --enable-preview --add-modules jdk.incubator.concurrent

      Shutdown policies

      Certain "short-circuiting" patterns are common when dealing with concurrent subtasks, such as cancelling all subtasks if one of them fails (we call that pattern "invoke all"), or, alternatively, when one of them succeeds (we call that "invoke any"). To support these patterns, two subclasses of StructuredTaskScope<code class="prettyprint" data-shared-secret="1656732681017-0.471430780674409">ShutdownOnFailure</code> and[ <code class="prettyprint" data-shared-secret="1656732681017-0.471430780674409">ShutdownOnSuccess</code>|https://download.java.net/java/early_access/loom/docs/api/jdk.incubator.concurrent/jdk/incubator/concurrent/StructuredTaskScope.ShutdownOnSuccess.html]— implement basic policies that shut down the scope upon the first fork failure or success, respectively. They also provide methods for handling exceptions and/or successful results.

      Here is a StructuredTaskScope with a shutdown-on-failure policy (seen in the handle example above) that runs a collection of tasks concurrently, and fails if any of them fails:

      <T> List<T> runAll(List<Callable<T>> tasks) throws Throwable {
          try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
              List<Future<T>> futures = tasks.stream().map(scope::fork).toList();
              scope.join();
              scope.throwIfFailed(e -> e);  // Propagate exception as-is if any fork fails
              // Here, all tasks have succeeded, so compose their results
              return futures.stream().map(Future::resultNow).toList();
          }
      } 

      In contrast to the example above, it is sometimes desirable to finish a task early not if any fork fails but, rather, if any fork succeeds. For example, a server application may want to obtain a result from any one of a collection of redundant services. Here is a StructuredTaskScope with a shutdown-on-success policy that returns the result of the first successful subtask; it fails if all subtasks fail or a deadline elapses. The policy will automatically shut down the scope, cancelling active forks, as soon as one of them succeeds.

      <T> T race(List<Callable<T>> tasks, Instant deadline) throws ExecutionException {
          try (var scope = new StructuredTaskScope.ShutdownOnSuccess<T>()) {
              for (var task : tasks) {
                  scope.fork(task);
              }
              scope.joinUntil(deadline);
              return scope.result(); // Throws if none of the forks completed successfully 
          } 
      }

      While these two policies are provided out of the box, custom policies that abstract other common patterns can be created by extending StructuredTaskScope and overriding the <code class="prettyprint" data-shared-secret="1656732681017-0.471430780674409">handleComplete</code> method.

      Fan-in uses

      While the examples above focused on fanout, i.e., concurrently performing multiple outgoing I/O operations, StructuredTaskScope can be used in other ways, such as managing tasks that respond to multiple incoming I/O operations. Such uses will likely fork an unknown number of forks in response to incoming requests. Here is an example of a server that forks subtasks to handle incoming connections inside a StructuredTaskScope:

      void serve(ServerSocket serverSocket) throws IOException, InterruptedException {
          try (var scope = new StructuredTaskScope<Void>()) {
              try {
                  while (true) {
                      var socket = serverSocket.accept();
                      scope.fork(() -> handle(socket));
                  }
              } finally {
                  // if there's been an error or we're interrupted, we stop accepting
                  // if we want to cancel all active connections, we also shut down:
                  scope.shutdown();
                  scope.join();
              }
          }
      }

      This will ensure all connections are closed before serve returns, and present all connection-handling subtasks as children of the scope's owner in a thread dump.

      Observability improvements

      The new thread dump added by JEP 425 is extended to support StructureTaskScope's grouping of threads into a hierarchy when generating a thread dump in JSON format. The jcmd command can be used to generate such a thread dump with:

      $ jcmd <pid> JavaThread.dump -format=json <file>

      The thread dump will include a JSON object for each StructuredTaskScope. The JSON object contains an array of the threads forked in the scope and their stack traces. The owner of a StructuredTaskScope will typically be blocked in the join method waiting for subtasks to complete; a thread dump makes it easy to see what the subtasks' threads are doing by showing the tree hierarchy imposed by structured concurrency. The JSON object for a StructuredTaskScope also has a reference to its parent so that the structure of the program can be reconstituted from the thread dump.

      The <code class="prettyprint" data-shared-secret="1656732681017-0.471430780674409">com.sun.management.HotSpotDiagnosticsMXBean</code> API can also be used to generate this thread dump. This API can also be invoked indirectly via the platform <code class="prettyprint" data-shared-secret="1656732681017-0.471430780674409">MBeanServer</code> from a local or remote JMX tool. Future APIs may be introduced to improve diagnosability and debugging support.

      Alternatives

      • Do nothing. Despite having more exceptional conditions to contend with than sequential code, the JDK's existing java.util.concurrent constructs are too low-level, and leave it to programmers to carefully consider such situations. Even when exercising care and discipline to always join all subtasks, only the developer knows the logical hierarchical relationships among them. The lifetime-coordination problems highlighted in the motivation section (i.e. cancellations not propagated and/or subtasks left running in the background) that are the result of this shortcoming don't have satisfactory solutions. These problems are long standing, but virtual threads make them more urgent by encouraging greater use of concurrency. At the same time, they also offer a path to a solution. By being cheap enough to represent a single concurrent task, a virtual thread can be used to record the task's position in the structured concurrency hierarchy without requiring a new construct for a "situated task."

      • Lean on ExecutorService. The ExecutorService interface already provides a small number of high-level combinators (invokeAll, invokeAny) that are structured in their design, but they are not sufficient to solve more general coordination problems. An implementation of ExecutorService that always enforces structure and restricts which threads can submit tasks (by throwing exceptions on violations) is possible but problematic because most uses of ExecutorService (or Executor) in the JDK and the ecosystem are not structured. Reusing the same API for a far more restricted concept is bound to cause confusion. For example, passing such a structured ExecutorService (or Executor) instance to existing methods that accept such a type would be all but certain to throw exceptions in most situtations. This approach was explored and prototyped, and the current proposal was found to be superior.

      Dependences

      JEP 425: Virtual Threads (Preview)

        Attachments

          Issue Links

            Activity

              People

              Assignee:
              alanb Alan Bateman
              Reporter:
              alanb Alan Bateman
              Owner:
              Alan Bateman Alan Bateman
              Reviewed By:
              Alex Buckley, Brian Goetz
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

                Dates

                Created:
                Updated: