Реактивное программирование или сопрограммы: между молотом и наковальней?

Сегодня в часы пик клиентским приложениям обычно приходится одновременно обрабатывать десятки тысяч (а то и больше) запросов. Облачные вычисления и прочие возможности динамического масштабирования помогают справиться с этим, однако в какой-то момент и они достигают своих пределов. Кроме того, привычное соотношение “один поток на запрос” в этом контексте уже не актуально.

Потокам обычно требуется от 128 до 256 КБ памяти на поток, по крайней мере, в JVM. Независимо от возможностей масштабирования, с увеличением числа запросов быстро достигается предел допустимого. Циклы сборки мусора не всегда укорачиваются при накоплении большого количества данных.

Существуют различные подходы к более эффективному использованию ресурсов. В этой статье мы сравним две концепции и рассмотрим примеры их реализации, а также определим преимущества и недостатки каждой. Эти концепции  —  реактивное программирование и сопрограммы.

Концептуальные различия

Кратко рассмотрим суть обеих концепций, а затем перейдем к их реализации.

Реактивное программирование, помогающее свести к минимуму количество потоков, предназначено для использования в многопоточной среде. Оно осуществляется путем интеллектуальной маршрутизации. Реактивное программирование работает с асинхронными функциональными цепочками, по которым входные данные передаются от производителей получателям.

Реактивные фреймворки полностью абстрагируют базовую модель от разработчика со всеми ее потоками и диспетчеризацией. Это приводит к более эффективному использованию ресурсов по ряду факторов. Однако любой, кто когда-либо имел дело с реактивными потоками, быстро поймет, что работать с ними совсем не просто из-за такого уровня абстрагирования. Для реактивных потоков база кода организована в форме функциональных цепочек вызовов.

Альтернативный подход к эффективному использованию ресурсов был разработан в виде сопрограмм. Эта концепция асинхронного программирования восходит к 60-м годам и в настоящее время применяется во все большем количестве языков программирования. Например, Kotlin поддерживает сопрограммы с версии 1.1. Концепция допускает асинхронное выполнение кода. Однако асинхронный код по-прежнему выглядит так, как будто он выполняется последовательно.

Сопрограммы не требуют реструктуризации базы кода и допускают императивный и последовательный стиль программирования. Однако реактивные фреймворки предлагают предопределенных операторов почти для каждой проблемы. Поскольку оба подхода имеют свои преимущества и недостатки, сравним их на более развернутом примере.

Используемые материалы

Весь код, используемый в примере, можно найти в репозитории Github. Он написан на языке Kotlin.

Что касается реактивного программирования, я использую реактивную триаду Project Reactor, Spring WebFlux и R2DBC, поэтому каждая часть приложения не блокируется, начиная с веб-уровня и заканчивая сохраняемостью. R2DBC (реактивное подключение к реляционным базам данных) предоставляет API реактивного программирования для реляционных баз данных. В данном случае мы практикуем интеграцию Spring Boot Data R2DBC для встроенной базы данных H2.

Для сопрограмм используется библиотека Kotlinx Coroutines, которая переносит сопрограммы в Kotlin в качестве сторонней библиотеки. Поскольку сопрограммы можно легко применять вместе с Spring WebFlux и Spring Data R2DBC, никаких дополнительных фреймворков не требуется.

Используемые фреймворки и их версии:

  • Kotlin 1.4.32;
  • Kotlinx Coroutines 1.4.2;
  • Spring Boot + WebFlux 2.4.2;
  • Project Reactor 3.4.2;
  • R2DBC 0.8.4.RELEASE;
  • Spring Data R2DBC 1.2.3.

Код

Для демонстрации программных решений возьмем микросервис с конечной точкой REST, минималистичным уровнем служб и слоем постоянного хранения для двух сущностей  —  Character(персонаж) и House (дом). Будем строить что-то вроде дома для персонажей “Игры престолов”.

Реализацию выполним отдельно для реактивного программирования и сопрограмм. При этом они используют одни и те же методы конечных точек для представления.

  • Путь для реактивного программирования API: /reactive/characters.
  • Путь для API сопрограмм: /coroutines/characters.

Методы, которые используются в обоих случаях:

  • Find by name (найти по имени): GET /?lastName=<lastName>;
  • Find by id (найти по идентификатору): GET /{id};
  • Add (добавить): PUT /?firstName=<firstName>&lastName=<lastName>;
  • Delete by name (удалить по имени): DELETE /?firstName=<firstName>&lastName=<lastName>.

Пример вызова:

curl -s "http://localhost:8080/reactive/characters?lastName=Stark"
[{"id":1,"firstName":"Eddard","lastName":"Stark","house":1},{"id":6,"firstName":"Arya","lastName":"Stark","house":1}]

Приложение заполнено только несколькими образцами данных.

Итак, начнем с внешних характеристик и постепенно продвинемся дальше  —  до слоя постоянного хранения данных.

Контроллер (реактивное программирование)

По большей части контроллер довольно прост в реализации:

@RestController
@RequestMapping("reactive/characters")
class ReactiveCharacterController(private val reactiveCharacterService: ReactiveCharacterService) {

@GetMapping
fun findByName(@RequestParam lastName: String): Flux<Character> =
reactiveCharacterService.findByLastName(lastName)

@GetMapping("/{id}")
fun findById(@PathVariable id: Long): Mono<Character> =
reactiveCharacterService.findById(id)

@PutMapping
fun addCharacter(@RequestParam firstName: String, @RequestParam lastName: String): Mono<ResponseEntity<String>> =
reactiveCharacterService
.addCharacter(firstName, lastName)
.map { ResponseEntity.status(HttpStatus.CREATED).build<String>() }
.switchIfEmpty { Mono.just(ResponseEntity.ok().build()) }
.onErrorResume(CharacterNotFoundException::class.java) {
Mono.just(ResponseEntity.badRequest().body(it.message))
}

@DeleteMapping
fun deleteByName(@RequestParam firstName: String, @RequestParam lastName: String): Mono<Void> =
reactiveCharacterService.deleteByName(firstName, lastName)

}

Контроллеры определяют четыре метода для четырех методов, которые нужно предоставить. Метод findByName может возвращать несколько персонажей, поэтому здесь мы возвращаем Flux  —  поток персонажей. findById находит только соответствующего персонажа, поэтому мы возвращаем объект Mono, содержащий ни одного или один персонаж.

Реализация метода addCharacter уже немного сложнее, так как у нас есть три разных результата для обработки.

  1. 201 (CREATED): персонаж ранее не существовал и мог быть добавлен.
  2. 200 (ОК): персонаж уже существовал, и ничего не изменилось.
  3. 400 (BAD_REQUEST): произошла ошибка проверки ввода, и запрос был отклонен.

Ошибки не могут быть обработаны аннотированными методами @ExceptionHandler, так как это нарушит рабочий процесс реактивного программирования. Таким образом, мы должны использовать один из многих методов OnErrorX, которые предоставляет Reactor API.

Наконец, метод deleteByName также прост. Только тип возврата Mono<Void> сначала выглядит непривычно, потому что на самом деле мы ничего не будем возвращать. Однако фреймворк ожидает, что мы вернем реактивный тип, поэтому мы ничего не инкапсулируем в тип Mono.

Контроллер (сопрограммы)

Для сопрограмм Kotlin реализация может выглядеть следующим образом:

@RestController
@RequestMapping("coroutines/characters")
class CoroutinesCharacterController(private val coroutinesCharacterService: CoroutinesCharacterService) {

@GetMapping
fun findByName(@RequestParam lastName: String): Flow<Character> =
coroutinesCharacterService.findByLastName(lastName)

@GetMapping("/{id}")
suspend fun findById(@PathVariable id: Long): Character? =
coroutinesCharacterService.findById(id)

@PutMapping
suspend fun addCharacter(@RequestParam firstName: String, @RequestParam lastName: String): ResponseEntity<String> {
return try {
val character = coroutinesCharacterService.addCharacter(firstName, lastName)

character?.let {
ResponseEntity.status(HttpStatus.CREATED).build()
} ?: ResponseEntity.ok().build()
} catch (ex: CharacterNotFoundException) {
ResponseEntity.badRequest().body(ex.message)
}
}

@DeleteMapping
suspend fun deleteByName(@RequestParam firstName: String, @RequestParam lastName: String) =
coroutinesCharacterService.deleteByName(firstName, lastName)

}

Первое, на что следует обратить внимание,  —  это различные типы возвращаемых данных, поскольку мы больше не используем реактивные типы. В целом может быть применено следующее соотнесение:

  • Mono<T> → T?
  • Flux<T> → Flow<T>
  • Mono<Void> → Void?

Обнуляемые типы Kotlin помогут упростить типы возвращаемых значений. Тип Flow инкапсулирует поток значений, который вычисляется асинхронно и ведет себя как общая Kotlin-коллекция со всеми ее операторами.

Далее следует отметить модификатор suspend для каждого метода (рассчитывайте на один возвращаемый Flow, потому что он внутренне вызывает только функции состояния ожидания). Данные функции маркируются как приостанавливающие, что означает: базовый поток не заблокирован. Функции состояния ожидания могут быть вызваны только из сопрограммы или другой функции состояния ожидания.

Но самое важное  —  это фактический довольно императивный стиль программирования, возможный при реализации с помощью сопрограмм. Для большинства разработчиков реализация будет иметь много общего с тем, что они привыкли видеть каждый день. Мы даже можем использовать здесь конструкцию try-catch (перехват ошибок).

Сопрограммы позволяют выполнять асинхронные операции с использованием императивного кода.

Уровень служб (реактивное программирование)

Переходим к уровню служб реактивного программирования:

@Service
class ReactiveCharacterService(private val reactiveHouseService: ReactiveHouseService,
private val reactiveCharacterRepository: ReactiveCharacterRepository) {

fun findByLastName(lastName: String): Flux<Character> =
reactiveCharacterRepository.findByLastName(lastName)

fun findById(id: Long): Mono<Character> =
reactiveCharacterRepository.findById(id)

fun deleteByName(firstName: String, lastName: String): Mono<Void> =
reactiveCharacterRepository
.findByFirstNameAndLastName(firstName, lastName)
.flatMap { reactiveCharacterRepository.deleteById(it.id!!) }

fun addCharacter(firstName: String, lastName: String): Mono<Character> =
reactiveCharacterRepository
.existsByFirstNameAndLastName(firstName, lastName)
.filter { it == false }
.flatMap {
reactiveHouseService
.findByName(lastName)
.switchIfEmpty { Mono.error { CharacterNotFoundException("No valid house found for the character $firstName $lastName!") }}
}.flatMap { reactiveCharacterRepository.save(Character(firstName = firstName, lastName = lastName, house = it.id!!)) }
}

Методы findByLastName и findById вполне понятны. Удаление персонажа в методе deleteByName требует немного больше усилий, но в основном выглядит довольно просто. Здесь важно выбрать правильного оператора для операции удаления. Хотя создается впечатление, что удаление записи в базе данных является побочным эффектом, влияющим на внешние факторы, и нас не волнуют последствия применяемого метода, мы не можем не использовать Mono.doOnNext. Метод удаления из ReactiveCrudRepository возвращает само значение, на которое можно подписаться, поэтому требуется, чтобы кто-либо подписался на него:

Mono<Void> deleteById(ID id);

В противном случае ничего бы не произошло, согласно принципу: ничего не происходит, пока вы не подпишетесь.

Мы могли либо подписаться вручную на возвращаемое значение, либо передать все по слоям, пока Spring WebFlux не обработает подписку в контроллерах. Я выбрал последнее и связал вызов с оператором flatMap.

Для метода addCharacter реализация выглядит сложнее, поскольку необходимо обработать три случая.

  1. Персонажа не существует, и его следует добавить.
  2. Персонаж уже существует, и ничего не следует менять.
  3. Дом персонажа не существует, и должно быть сделано исключение.

Фильтр в строке 19 обрабатывает второй случай, возвращая пустое значение Mono, если применяется предикат. В других случаях мы сопоставляем отображение результата с выполнением House и снова сопоставляем его для метода цепного сохранения save. Если выполнение House не дает результата, предоставляем сообщение об Mono-ошибке, которое инкапсулирует исключение. Можете считать метод switchIfEmpty методом Optional.orElseGet.

Как видите, на уровне служб реактивное программирование имеет несколько подводных камней, о которых следует знать. Теперь посмотрим, как выглядит вариант с сопрограммами.

Уровень служб (сопрограммы)

С помощью сопрограмм службы реализуются следующим образом:

@Service
class CoroutinesCharacterService(private val coroutinesHouseService: CoroutinesHouseService,
private val coroutinesCharacterRepository: CoroutinesCharacterRepository) {

fun findByLastName(lastName: String): Flow<Character> =
coroutinesCharacterRepository.findByLastName(lastName)

suspend fun findById(id: Long): Character? =
coroutinesCharacterRepository.findById(id)

suspend fun deleteByName(firstName: String, lastName: String) =
coroutinesCharacterRepository
.findByFirstNameAndLastName(firstName, lastName)
?.let {
coroutinesCharacterRepository.deleteById(it.id!!)
}

suspend fun addCharacter(firstName: String, lastName: String): Character? =
if (coroutinesCharacterRepository.existsByFirstNameAndLastName(firstName, lastName)) {
null
} else {
coroutinesHouseService.findByName(lastName)
?.let { coroutinesCharacterRepository.save(Character(firstName = firstName, lastName = lastName, house = it.id!!)) }
?: throw CharacterNotFoundException("No valid house found for the character $firstName $lastName!")
}
}

Как и прежде, findByLastName и findById проще некуда. Метод deleteByName также довольно прост. С помощью хорошо известной конструкции expr1?.let { expr2 } мы можем выполнить операцию expr2, только если expr1 возвращает определенное значение. Таким образом, мы запускаем удаление лишь в том случае, если был найден какой-то персонаж. Сопрограммы позволяют придерживаться общих шаблонов программирования (по крайней мере, общеизвестных в среде разработчиков Kotlin).

Для метода addCharacter можно воспользоваться выражением if в Kotlin, чтобы немного упростить структуру. Если персонаж был найден, мы возвращаем значение null и больше ничего не делаем. В противном случае сначала ищем дом и снова используем выражение let, чтобы сохранить персонажа, если дом был найден. Если дом не был найден, запускается тернарный условный оператор ?:, и мы создаем собственное исключение.

Поддержка постоянства хранения данных (реактивное программирование / сопрограммы)

Наконец мы переходим к самой простой части программных решений  —  поддержке хранения данных. Начиная с выпуска Spring Data Neumann, создание запросов работает для репозиториев реактивного программирования и сопрограмм.

Это существенно упрощает реализацию для обоих вариантов.

Реактивный репозиторий:

interface ReactiveCharacterRepository : ReactiveCrudRepository<Character, Long> {

fun findByLastName(lastName: String): Flux<Character>
fun findByFirstNameAndLastName(firstName: String, lastName: String): Mono<Character>
fun existsByFirstNameAndLastName(firstName: String, lastName: String): Mono<Boolean>

}

Репозиторий сопрограмм:

interface CoroutinesCharacterRepository : CoroutineCrudRepository<Character, Long> {

fun findByLastName(lastName: String): Flow<Character>
suspend fun findByFirstNameAndLastName(firstName: String, lastName: String): Character?
suspend fun existsByFirstNameAndLastName(firstName: String, lastName: String): Boolean

}

Наследуйте правильному интерфейсу, создавайте свои методы, используйте соответствующие типы и не забывайте ключевое слово suspend для методов сопрограмм. Тут все довольно понятно.

Тренировка

Получение всех персонажей:

curl -s -XGET "http://localhost:8080/reactive/characters?lastName=Stark"
[{"id":1,"firstName":"Eddard","lastName":"Stark","house":1},{"id":6,"firstName":"Arya","lastName":"Stark","house":1},{"id":10,"firstName":"Sansa","lastName":"Stark","house":1}]

Удаление персонажа:

curl -s -XDELETE "http://localhost:8080/coroutines/characters?firstName=Sansa&lastName=Stark"
curl -s -XGET "http://localhost:8080/coroutines/characters?lastName=Stark"
[{"id":1,"firstName":"Eddard","lastName":"Stark","house":1},{"id":6,"firstName":"Arya","lastName":"Stark","house":1}]

Добавление персонажа:

curl -s -XPUT "http://localhost:8080/coroutines/characters?firstName=Ellaria&lastName=Sand"
No valid house found for the character Ellaria Sand!

Заключение

Обе концепции отвечают требованиям и позволяют относительно легко написать асинхронный код.

В реактивном программировании мы должны придерживаться реактивных цепочек, выстраивая вокруг них рабочий процесс. Понадобится некоторое время, чтобы привыкнуть к реактивным операторам, сохраняемой цепочке и требованию подписаться на значения (что может быть сделано фреймворком, когда мы используем его правильно, но тем не менее это требуется).

С другой стороны, использование сопрограмм кажется более привычным. Хотя код выполняется асинхронно, он все равно выглядит так, будто выполняется последовательно. Мы можем пользоваться общепринятыми шаблонами программирования языка, которые делают жизнь разработчиков простой и легкой.

Читайте также:

Читайте нас в TelegramVK и Яндекс.Дзен


Перевод статьи Benedikt Jerat: Reactive or Coroutines: Between a rock and a hard place?

Предыдущая статьяКак алгоритм «случайный лес» вычисляет продавцов-мошенников на онлайн-рынке
Следующая статьяКак обеспечить обмен данными между микросервисами