Осваиваем реактивное программирование на Java

Асинхронный ввод/вывод уже какое-то время используется в обиходе. При этом разные языки реализуют его по-разному, но все предоставляют способ уменьшить количество потоков, давая вроде бы полную конкурентность. JavaScript занимался этим с самого начала. При использовании всего одного потока будет мало хорошего, отправь вы в продакшн блокирующий вызов. 

Несмотря на то, что реактивный Java все больше привлекает интерес разработчиков, большинство знакомых мне программистов по-прежнему живут в многопоточной парадигме. Почему? Принцип потоков относительно легко усвоить. Реактивное же программирование требует переосмысления многих привычных нам принципов программирования. Попытка объяснить, почему асинхронный ввод/вывод является лучшей альтернативой, подобна попытке объяснить сферичность Земли тому, кто всегда верил в ее плоскую природу. 

Я предпочитаю обучаться через игру и эксперименты, создавая “игрушечные” системы, которые затем при необходимости можно использовать в качестве основы для больших систем. Здесь я представлю одну такую базовую систему, которая продемонстрирует основы реактивного Java, используя Project Reactor. А поскольку она мала (меньше тысячи строк в девяти файлах), то и понять ее будет несложно.

Это система псевдо-микросервисов. То есть несмотря на то, что все заключено в один исполняемый файл, каждый “сервис” находится в отдельном классе, который не содержит состояния, а сами классы могут взаимодействовать друг с другом только с помощью очереди сообщений и базы данных. Их все запускает один основной класс, при этом они программируются на завершение по истечению установленного периода времени. 

Моя система смоделирована на основе схемы приема подержанных транспортных средств (ТС) и начинается с заказа на покупку, который в нашем случае будет случайно генерироваться и добавляться в очередь сообщений. Другой сервис получает заказ на покупку из этой очереди, добавляет его в базу данных и отправляет информацию о типе ТС в одну из трех очередей сообщений, в зависимости от того легковой это автомобиль, грузовик или мотоцикл. В завершении есть еще три сервиса, которые считывают эти три очереди, проверяют наличие заказа в базе данных и определяют автомобилю парковочное место для транспортировки.

Функция main выступает больше в качестве функции тестирования, поскольку в настоящей системе микросервисов каждый из них должен иметь собственную функцию main или быть частью такого фреймворка, как Spring. Однако нам нужно лишь увидеть, что все согласованно работает в одной JVM. Настоящий тест этой системы заключается в наблюдении работоспособности всех сервисов по-отдельности, но я предположу, что это сработает и так либо потребует минимальных корректировок. Вот суть функции main:

PurchaseOrderConsumer.consume();
CarConsumer.consume();
TruckConsumer.consume();
MotorcycleConsumer.consume();
PurchaseOrderGenerator.start();

Мы стартуем всех потребителей, после чего запускаем генератор заказа на покупку, который создает заказы и поочередно помещает их в очередь в течение 10 секунд. Поскольку ни у одного из классов нет состояния, все их методы могут быть статическими.

Так как все действие начинается в PurchaseOrderGenerator, первым делом мы заглянем туда. Вот основная часть кода:

Sender sender = RabbitFlux.createSender(soptions);
sender.sendWithPublishConfirms(
  Flux.generate((sink) -> sink.next(createRandomPurchaseOrder()))
    .cast(PurchaseOrder.class)
    .doOnNext((o) -> log("produced: " + o))
    .delayElements(Duration.ofMillis(100))
    .take(Duration.ofSeconds(10))
    .map(i -> new OutboundMessage("", 
         QUEUE_NAME, 
        writeJson(i).orElse("").getBytes()))
    .doFinally((s) -> {
       log("Generator in finally for signal " + s);
       sender.close();
     })
)
.subscribe();

Мы используем очередь сообщений RabbitMQ, являющуюся реактивной библиотекой Java. Она содержит Sender.sendWithPublishConfirms, который получает Flux<OutboundMessage> и возвращает Flux<OutboundMessageResult>. Flux —  это последовательность элементов, к которым можно обращаться по одному. Ее длина может быть бесконечной, и одновременно в процессе задействуется только один элемент, хотя можно задействовать и больше, что определяется рядом факторов.

Flux является ключевым компонентом библиотеки асинхронного ввода/вывода Project Reactor. Он позволяет объединять методы в цепочку, чтобы управлять элементами по ходу их движения в потоке через систему, а также управлять самим этим потоком. Методы, управляющие элементами, обычно получают в качестве параметров лямбда-функции, что позволяет вызывать их для каждого элемента по мере его продвижения через систему. Два самых распространенных метода  —  это map и filter. map предназначен для преобразования элементов в потоке из одного типа в другой. filter служит для отбрасывания элементов, не соответствующих определенным критериям.

Нам нужно создать непрерывный список заказов на покупку и представить его методу Sender.sendWithPublishConfirms как Flux<OutboundMessage>. Этот процесс мы начинаем со строки Flux.generate((sink) -> sink.next(createRandomPurchaseOrder())), которая создает потенциально бесконечный список случайных заказов на покупку. Поскольку Flux.generate возвращает Flux<Object>, нам нужен метод cast, который даст системе понять, что мы работаем над объектами PurchaseOrder. Flux-метод doOnNext представляет удобный способ ввести побочные эффекты. Здесь я его использую в основном для логирования.

Далее метод delayElements добавляет задержку между каждым элементом. Я делаю это для симуляции появления случайных заказов с разумной скоростью. Можете без проблем его удалить, но тогда вместо выполнения системы в течение десяти секунд и завершения, она будет заполняться очередями сообщений о заказах на покупку, для обработки которых потребуется уже более двух минут. Метод take позволит Flux выполнять обработку на протяжении десяти секунд и затем останавливаться. Я так делаю, потому что этот генератор служит исключительно для тестирования. В реальной же системе заказы на покупку наверняка будут поступать из API, который будет добавлять их в очередь сообщений.

Чтобы RabbitMQ обрабатывал элементы, сначала их нужно преобразовать в OutBoundMessage как массив байтов, представляющий сериализацию JSON заказа на покупку. Это делается с помощью метода map и лямбда-выражения, которое создает нужный OutBoundMessage. Если в сериализации заказа на покупку возникает ошибка, мы отправляем пустой массив, который на противоположной стороне отфильтруется. Можно отфильтровать его и здесь, чтобы избавить очереди от необходимости передачи недействительных сообщений. Но нам все равно нужна проверка на встречной стороне, и раз уж это всего лишь тестовая схема, то можно отправить и их.

Метод doFinally вызывает свое лямбда-выражение всего раз, и в данном случае мы используем его для логирования завершения потока и закрытия отправителя. В конце этой цепочки мы получаем Flux<OutboundMessage>, который подходит для передачи методу Sender.sendWithPublishConfirms. Назад же мы получаем Flux<OutboundMessageResult>, который можно проверить, чтобы убедиться в размещении сообщения в соответствующей очереди. В данном простом случае проверку мы не делаем. 

Метод subdcribe, который вызывается для Flux<OutboundMessageResult>, начинает цепочку. Важно отметить, что до вызова метода subscribe мы всего лишь выстраиваем цепочку действий. Как только вызывается метод subscribe, мы больше ничего с потоком данных не делаем, и он возвращает только объект Disposable, который можно использовать, чтобы остановить подписку. Мы даже не храним объект Disposable, поскольку наш поток остановится спустя десять секунд, как прописано в цепочке событий. Начальный Flux, который мы создали с помощью метода generate, не завершается вызовом subscribe, но ожидает, что Sender, которому мы его передаем, подпишется на него, чтобы запустить эту цепочку событий.

Прежде чем перейти к реальным сервисам в системе, заглянем за кадр и посмотрим, что всем этим движет. Здесь есть один магический элемент, который в большинстве случаев остается незаметен, а именно Scheduler. Взгляните на Flux, созданный generate. Первые два метода в цепочке, cast и doOnNex, содержат лямбда-функции, которые не блокируются и возвращают результат через очень небольшой промежуток времени. Однако следующий метод в цепочке, delayElements, должен каким-то образом блокироваться, чтобы вносить в поток задержку. Как же нам заблокировать функцию в мире без блокировок? Ответом будет Scheduler (планировщик). Планировщик может просматривать цепочку, а также проходящие через нее элементы, решая, какой из них и в какой части цепочки должен быть обработан. 

У вас голова еще не взорвалась? На это можно посмотреть так. Внутри многозадачной операционной системы нечто наблюдает за ожидающими процессами и решает, какой из них очередным получит внимание ЦПУ. Планировщик работает аналогичным образом, только ему не нужно беспокоиться о прерываниях. Все задачи, с которыми работает планировщик, представляют очень мелкие фрагменты кода. “Блокирующие” задачи вроде delayElements разбиваются на две части. Первая часть получает элемент из потока, устанавливает таймер и возвращает управление планировщику. Далее планировщик может выполнять какую-нибудь другую задачу. Как только таймер метода delayElements истекает, вызывается вторая часть и сохраненный элемент получает возможность перемещения в следующий метод цепочки.

Я рассматриваю этот процесс как две непрерывные конвейерные ленты. Первая перемещает элементы по направлению ко второй, но вторая движется намного медленнее. В итоге элементы задерживаются на первой ленте, в то время как вторая их постепенно с нее снимает. По мере накопления задерживающихся элементов возникает так называемое “обратное давление”, и система понимает, что нужно просто остановить первую ленту, пока вторая ее немного не догонит. 

Далее рассмотрим второй сервис системы, PurchaseOrderConsumer. Его задача в извлечении элементов из очереди заказов на покупку, их логировании в базе данных, обработке и последующем отборе для другой очереди. Вот основной код:

Sender sender = RabbitFlux.createSender(soptions);
Receiver receiver = RabbitFlux.createReceiver(roptions);
sender.sendWithPublishConfirms(receiver
  .consumeAutoAck(PO_QUEUE_NAME)
  .map(d -> readJson(new String(d.getBody())))
  .filter(PurchaseOrder::isValid)
  .doOnNext(po -> log("recevied po " + po))
  .doOnDiscard(PurchaseOrder.class, 
               po -> log("Discarded invalid PO " + po))
  .flatMap(po -> writePoJson(po).map(j -> reactiveCollection
      .upsert(po.getId(), j)
      .map(result -> po))
    .orElse(Mono.just(po)))
  .map(po -> factory.build(po.getType(), po))
  .timeout(Duration.ofSeconds(10))
  .doFinally((s) -> {
    log("Consumer in finally for signal " + s);
    receiver.close();
    sender.close();
    cluster.disconnect();
  })
  .map(v -> new OutboundMessage("", 
                outQueue.get(v.getPo().getType()), 
                vehicleWriter
                  .get(v.getPo().getType())
                  .apply(v)
                  .orElse("")
                  .getBytes()))
)
.subscribe();

Очень похоже на первый сервис. Здесь присутствует внешняя обертка, которая отправляет отобранные объекты по разным очередям сообщений и вызывает subscribe. Разница в том, что вместо получения Flux для отправки с помощью метода generate, мы получаем Flux от метода Receiver.consumeAutoAck. Таким образом создается поток элементов, поступающих из очереди сообщения заказов на покупку, после чего мы можем выстраивать для них цепочку обработки. Эта цепочка состоит в основном из обычных методов, map для преобразования строки JSON в элемент, filter для удаления недействительных сообщений, doOnNext и doOnDiscard для логирования обрабатываемых заказов на покупку или тех, которые были отфильтрованы. 

Затем мы переходим к новому методу, flatMap. Это особый метод, заслуживающий внимания. Те, кому flatMap знаком из Java Streams, знают, что если трансформация возвращает Stream (или список, который можно легко преобразовать в Stream), то flatMap изменит внешний Stream, чтобы он включал все элементы внутреннего Stream. Другими словами, он преобразует Stream (поток) потоков в единый поток из всех элементов. В мире Flux происходит то же самое, преобразование Flux (последовательности) последовательностей в единую последовательность со всеми элементами.

В рассматриваемом нами случае преобразование касается метода ReactiveCollection.upsert, который вставляет заказ на покупку в CouchBase. Возвращаемый тип получается Mono<MutationResult>. Нам еще нужно поговорить о Mono, который представляет особый тип Flux. Работает Mono во многом аналогично Flux, но может содержать либо один элемент, либо быть пустым. Также аналогично Flux содержащийся в нем элемент не материализуется с помощью Mono, но требует его подождать. Тут нам снова нужно задерживать обработку вложенного элемента, но без блокирования.

Здесь на помощь приходит flatMap. В этом случае, так как внутренней структурой является Mono, во внешнем Flux по-прежнему соблюдается соответствие один к одному. На деле мы отбрасываем результат и просто возвращаем оригинальный заказ на покупку для дальнейшей обработки (как делалось отображением map(result -> po)). Итак, flatMap работает аналогично методу delayElements, о котором говорилось ранее, где он разбивается на две части, между которыми втискивается весь ввод/вывод базы данных. Но планировщик знает, что нужно передать элемент во flatMap и переходить к обработке другого элемента. Он также не забудет получить возвращаемый элемент базы данных, когда он будет готов к дальнейшей обработке. И снова flatMap разделяет нашу воображаемую конвейерную ленту на две части.

Осталось проговорить последний метод цепочки, остальные же аналогичны предыдущим. Если бы речь шла о реальном сервисе, то метод timeout даже не должен был бы находиться в потоке. У нас он присутствует только, чтобы отменять поток, когда в рамках заданного временного промежутка не происходит получение элемента. Это позволяет нашей небольшой игрушечной системе все закрывать и в конечном итоге завершаться.

В реальном сервисе такого бы у вас наверняка не было. Единственное исключение возможно, если сервис ожидает определенное количество элементов в секунду, и в течение нескольких минут не получает ни одного. Тогда вы можете заподозрить, что было потеряно соединение с очередью, и захотите, чтобы служба оркестровки, например Kubernetes, перезапустила ваш сервис. 

Еще один вариант применения таймаута  —  это тестирование кода. Если ваш тест ожидает элемент из очереди сообщений, то вы можете установить таймаут так, чтобы тест не ждал бесконечно.

В системе остался заключительный набор сервисов, считывающий очереди сообщений, в которые PurchaseOrderConsumer выводит тип Vehicle. Единственная выполняемая им обработка состоит в считывании заказа на покупку из типа Vehicle, подтверждении наличия заказа в базе данных, определении для него парковочного места согласно типу Vehicle и логировании конечного объекта. В реальной системе вы бы наверняка захотели добавить его в учетную базу данных транспортных средств либо использовать какой-либо другой способ отслеживания.

Я создал три класса сервисов, по одному для каждого типа; Car (легковое авто), Truck (грузовик) или Motorcycle (мотоцикл). В примере текущего кода я мог сделать один класс с рядом параметров и запустить метод получения три раза, по одному для каждого типа. Но я оставил их отдельными, так как предвидел, что для каждого типа потребуется своя логика, кардинальность типов окажется очень низка, а базовый код короче сотни строк. Если бы у сервисов было много общей логики, я бы не пожалел лишнего времени на их объединение. Но мой прагматизм иногда превосходит перфекционизм, и мне это нравится.

Чтобы все проверить, пришлось запустить CoucheBase и RabbitMQ. В обоих случаях я просто создал экземпляр Docker:

docker run -d --hostname my-rabbit -p 5672:5673 rabbitmq:3
docker run -d -p 8091-8094:8091-8094 -p 11210:11210 couchbase

В случае с CouchBase мне пришлось открыть консоль администратора по адресу http://localhost:8091, прокликать пользовательское соглашение и настроить корзину po. После этого я смог запустить сервисы и наблюдать журнал:

...
pool-10-thread-20 recevied po PurchaseOrder(id=e186589fd87279f3, price=78023.46, type=Motorcycle, time=2021-01-05T15:43:10.704386Z)
cb-io-kv-17-2 po for motorcycle e186589fd87279f3 confirmed
cb-io-kv-17-2 received motorcycle Vehicle(po=PurchaseOrder(id=e186589fd87279f3, price=78023.46, type=Motorcycle, time=2021-01-05T15:43:10.704386Z), lot=motorcycle lot a)
parallel-8 Generator in finally for signal onComplete
parallel-13 Car consumer in finally for signal cancel
parallel-14 Truck consumer in finally for signal cancel
parallel-7 Motorcycle consumer in finally for signal cancel
parallel-4 Consumer in finally for signal onError
------------------------------------------------------------------------
BUILD SUCCESS
------------------------------------------------------------------------
Total time:  25.775 s
Finished at: 2021-01-05T07:43:23-08:00
------------------------------------------------------------------------

Сборка прошла успешно! Если вы соберете и запустите систему, то советую внимательно проследить за логами, все из которых выводят текущий поток. Вскоре вы заметите паттерн маленьких конвейерных лент (некоторые будут выполняться параллельно), о котором я говорил. 

rkamradt/usedvehicles
Contribute to rkamradt/usedvehicles development by creating an account on GitHub.github.com

Читайте также:

Читайте нас в Telegram, VK и Яндекс.Дзен


Перевод статьи Randal Kamradt Sr: Understanding Reactive Java

Предыдущая статьяИскусственный интеллект: надежды и угрозы
Следующая статьяБольшие данные и их влияние на постпандемический мир