Реализация структурированной конкурентности в Java и Kotlin

Возможно, вы уже встречались с таким понятием, как структурированная конкурентность, и пытались прояснить для себя его суть. Структурированная конкурентность  —  это парадигма, которая стремится упростить чтение и понимание конкурентных программ, ускорить их написание и обеспечить их безопасность. 

Прежде чем приступить к детальному разбору данной темы, обоснуем ее актуальность. 

Суть проблемы 

Чтобы объяснить востребованность структурированной конкурентности, откроем завесу прошлого и посмотрим, как писались конкурентные программы несколько лет назад. 

В парадигме неструктурированной конкурентности потоки запускаются в любом участке кода, без какой-либо конкретики относительно места их начала и конца.  

Ниже представлен пример на Java. Хотя он и попроще, чем более ранние примеры на Java, тем не менее содержит одну серьезную проблему. Данный код не безопасен, поскольку сбой одного из потоков не отменяет выполнение других задач, и результаты непредсказуемы:

public static void main(String[] args) throws InterruptedException {
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("I'm task 1 running on " + Thread.currentThread().getName());
});

CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("I'm task 2 running on " + Thread.currentThread().getName());
});

CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
System.out.println("I'm task 3 running on " + Thread.currentThread().getName());
throw new RuntimeException("Something went wrong!");
});

CompletableFuture.allOf(future1, future2, future3)
.join();
System.out.println("All tasks started!");
Thread.sleep(3000);
}

При запуске этого кода задача 3 (англ. task 3) сразу выбрасывает исключение, но при этом выполнение других задач не останавливается. Приложение продолжает работу, и другие задачи успешно завершаются через пару секунд. Однако мы рассчитывали на совсем другой результат. В идеале должна произойти немедленная отмена всех остальных задач. Если представить, что одна из них выполняется бесконечно, то каждый сбой оборачивался бы утечкой потока:

> Task :UnstructuredConcurrency.main() FAILED
I'm task 3 running on ForkJoinPool.commonPool-worker-3
I'm task 2 running on ForkJoinPool.commonPool-worker-2
I'm task 1 running on ForkJoinPool.commonPool-worker-1Deprecated Gradle features were used in this build, making it incompatible with Gradle 8.0.You can use '--warning-mode all' to show the individual deprecation warnings and determine if they come from your own scripts or plugins.See https://docs.gradle.org/7.4/userguide/command_line_interface.html#sec:command_line_warnings
2 actionable tasks: 1 executed, 1 up-to-date
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.RuntimeException: Something went wrong!

Что же предпринять в такой ситуации? Решение данной проблемы берет на себя структурированная конкурентность. Разберем принцип ее действия. 

Структурированная конкурентность 

Две главные цели структурированной конкурентности звучат так: предоставление возможности написания понятного конкурентного кода и предотвращение утечек потоков. Данный подход гарантирует, что при завершении процесса управления кодом, разделенного на множество конкурентных потоков, мы всегда объединяем эти потоки и сохраняем контроль над их выполнением. Благодаря структурированной конкурентности все потоки в приложении находятся под пристальным наблюдением разработчика. 

Теперь начало и конец конкурентного кода представлены в явном виде. Любой разработчик, независимо от своего опыта или уровня знаний в области конкурентного программирования, сможет легко разобраться в подобном коде. Простое решение всегда выигрышное. 

За последние годы несколько языков программирования воплотили концепцию структурированной конкурентности, среди них Golang, Kotlin и Python. Далее рассмотрим, как данный подход реализуется в Kotlin.  

Структурированная конкурентность в Kotlin

В Kotlin структурированная конкурентность осуществляется за счет области корутин (англ. coroutine scope). Корутины  —  это так называемые “виртуальные потоки” Kotlin, исполняемые блоки кода, требующие для работы поток ОС, но не привязанные к нему. Они ожидают операции ввода-вывода (англ. IO), не блокируя заданный поток ОС, которым в это же время может воспользоваться другая корутина. Более подробная информация о корутинах Kotlin предоставлена по ссылке

Логотип Kotlin 

Смысл в том, что область корутины всегда принадлежит сущности с конечным жизненным циклом. Примером может послужит перегруженное пакетное задание (англ. batch job), использующее тысячи потоков для обеспечения высокой пропускной способности. 

С завершением этого задания прекращает существование и его область видимости, вследствие чего все принадлежащие ей корутины или виртуальные потоки сходят на нет. 

Kotlin предусматривает возможность применения GlobalScope, однако делать этого не стоит. Ведь глобальная область видимости просуществует весь жизненный цикл приложения, что в большинстве случаев не отвечает нашим целям, поскольку ведет к множественным утечкам потоков. 

Использование глобальной области видимости допустимо только при условии совпадения жизненных циклов сущности и приложения/сервиса. В качестве примера рассмотрим фоновое задание, которое постоянно повторяет свое выполнение. Предположим, что мы реализуем распределенную систему. Она требует, чтобы на каждом узле выполнялось фоновое задание для отправки периодического сигнала (англ. heartbeat) остальным узлам системы на всем протяжении работы приложения. 

Для этой цели с помощью GlobalScope.launch мы создаем и запускаем поток, который остановится только при завершении работы приложения. 

Самое главное  —  всегда определять область видимости для запускаемых потоков, независимо от их количества, будь то один или несколько. Соблюдение данного правила гарантирует, что при ошибочной работе приложения в заданной области все дочерние потоки внутри нее будут корректно отменены. Это позволит предотвратить утечки потоков, что бы ни случилось! 

Следующий пример поможет лучше понять только что рассмотренное поведение: 

suspend fun main(): Unit {
GlobalScope.async {
val result = sumNumbersToRandomNumbers(1, 10)
println("Task completed with result $result!")
}.await()
}

suspend fun sumNumbersToRandomNumbers(n1: Int, n2: Int): Int =
coroutineScope {
val job1 = async {
delay(1000)
println("Adding random number to $n1")
n1 + nextRandom()
}
val job2 = async {
delay(500)
println("Adding random number to $n2")
n2 + nextRandom()
}
job1.await() + job2.await()
}

private fun nextRandom(): Int = (0..9999).random()

Здесь в заданной области видимости запускаются 2 фоновых потока. Результат выполнения данного кода выглядит примерно так: 

Adding random number to 10
Adding random number to 1
Task completed with result 13232!Process finished with exit code 0

Job2 выводит свое значение первым, поскольку мы установили более длительное время ожидания для имитации задержки IO. 

Убедимся в безопасности данного кода в случае непредвиденного сбоя. Для этого выбрасываем исключение во второй корутине, когда первый поток ожидает одну секунду. В предыдущем примере с CompletableFuture при возникновении исключения другие потоки продолжали работу как ни в чем не бывало. Теперь мы этого не допустим и отменим любой выполняющийся поток в указанной области. Ниже представлен код, воспроизводящий только что описанный алгоритм действий: 

suspend fun main(): Unit {
GlobalScope.async {
val result = sumNumbersToRandomNumbers(1, 10)
println("Task completed with result $result!")
}.await()
}

suspend fun sumNumbersToRandomNumbers(n1: Int, n2: Int): Int =
coroutineScope {
val job1 = async {
delay(1000)
println("Adding random number to $n1")
n1 + nextRandom()
}
val job2 = async {
delay(500)
throw RuntimeException("Something went wrong!")
println("Adding random number to $n2")
n2 + nextRandom()
}
job1.await() + job2.await()
}

private fun nextRandom(): Int = (0..9999).random()

Обратите внимание на Job2  —  мы генерируем исключение через 500 мс. На этом этапе job1 должна подождать дополнительные 500 мс и вывести какой-либо результат. Однако если все реализовано правильно, то произойдет выброс исключения, и среда выполнения сразу отменит все работающие потоки. Именно это и происходит при запуске следующего кода: 

Exception in thread "main" java.lang.RuntimeException: Something went wrong!
at org.theboreddev.CoroutinesKt$sumNumbersToRandomNumbers$2$job2$1.invokeSuspend(Coroutines.kt:27)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)

Можете сами поэкспериментировать и убедиться, что все работает как надо. 

Теперь вы знаете, как Kotlin группирует корутины в одной области видимости и автоматически обрабатывает в ней любой поток. Интересно, а предоставляет ли Java подобные возможности? 

Структурированная конкурентность в Java

Ответим кратко  —  нет или, по крайней мере, пока нет. Команда JDK уже долгое время работает над преобразованием модели конкурентности в Java. Данный проект получил название Project Loom. Нововведения, реализованные в рамках этого проекта, позволят работать с виртуальными потоками и структурированной конкурентностью в Java. Что касается структурированной конкурентности, она была представлена в Java 19 как часть JEP 428, но все еще находится в зачаточном состоянии. Виртуальные потоки были заявлены в Java 19 как новая ознакомительная функциональность в составе JEP 425. Для работы с ознакомительными версиями требуется сначала их активировать

С укоренением введенных изменений улучшится и модель конкурентности, и понимание концепции многопоточности в Java.

Больше не будет существовать текущее ограничение в Java, привязывающее поток Java к потоку ОС, и мы сможем создавать устойчивые, надежные и производительные системы. Какой же это будет гигантский прорыв в развитии экосистемы Java!

Нижеприведенный пример иллюстрирует нововведение в реализации структурированной конкурентности в Java. Перед вами совершенно новая парадигма: 

Response handle() throws ExecutionException, InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<String> user = scope.fork(() -> findUser());
Future<Integer> order = scope.fork(() -> fetchOrder());scope.join(); // Join both forks (Объединение обоих ответвлений)
scope.throwIfFailed(); // ... and propagate errors (... и распространение ошибок)// Here, both forks have succeeded, so compose their results (Здесь оба ответвления успешно справились, поэтому их результаты объединяются)
return new Response(user.resultNow(), order.resultNow());
}
}

Данная статья не предусматривает подробный анализ виртуальных потоков Java, но думаю, что вскоре они дадут повод заговорить о себе. Основываясь на изученном материале, перспективы их развития выглядят многообещающими. Однако позволим себе некоторую недосказанность, чтобы стимулировать интерес к дальнейшим преобразованиям модели конкурентности в Java!

Заключение 

Мы рассмотрели, как некоторые языки реализуют виртуальные потоки для создания более устойчивых систем, в которых поток ОС совместно используется сотнями или тысячами виртуальных потоков, поскольку они легковесны по сравнению с существующими потоками Java. Даже Java переходит на эту новую парадигму, упрощающую понимание и написание конкурентного кода. В скором времени мы избавимся от необходимости писать подверженный ошибкам конкурентный код, продумывая множество возможных ситуаций, способных нарушить его работу! 

С принципом реализации конкурентности и понятием горутин в Golang можно ознакомиться по указанной ссылке

Читайте также:

Читайте нас в TelegramVK и Дзен


Перевод статьи The Bored Dev: Understanding Structured Concurrency With Java and Kotlin

Предыдущая статьяКоманды Go и переменные среды, которые должен знать каждый разработчик
Следующая статьяКак работает внедрение зависимостей в Angular