Асинхронный ввод/вывод уже какое-то время используется в обиходе. При этом разные языки реализуют его по-разному, но все предоставляют способ уменьшить количество потоков, давая вроде бы полную конкурентность. JavaScript занимался этим с самого начала. При использовании всего одного потока будет мало хорошего, отправь вы в продакшн блокирующий вызов.
Несмотря на то, что реактивный Java все больше привлекает интерес разработчиков, большинство знакомых мне программистов по-прежнему живут в многопоточной парадигме. Почему? Принцип потоков относительно легко усвоить. Реактивное же программирование требует переосмысления многих привычных нам принципов программирования. Попытка объяснить, почему асинхронный ввод/вывод является лучшей альтернативой, подобна попытке объяснить сферичность Земли тому, кто всегда верил в ее плоскую природу.
Я предпочитаю обучаться через игру и эксперименты, создавая “игрушечные” системы, которые затем при необходимости можно использовать в качестве основы для больших систем. Здесь я представлю одну такую базовую систему, которая продемонстрирует основы реактивного Java, используя Project Reactor. А поскольку она мала (меньше тысячи строк в девяти файлах), то и понять ее будет несложно.
Это система псевдо-микросервисов. То есть несмотря на то, что все заключено в один исполняемый файл, каждый “сервис” находится в отдельном классе, который не содержит состояния, а сами классы могут взаимодействовать друг с другом только с помощью очереди сообщений и базы данных. Их все запускает один основной класс, при этом они программируются на завершение по истечению установленного периода времени.
Моя система смоделирована на основе схемы приема подержанных транспортных средств (ТС) и начинается с заказа на покупку, который в нашем случае будет случайно генерироваться и добавляться в очередь сообщений. Другой сервис получает заказ на покупку из этой очереди, добавляет его в базу данных и отправляет информацию о типе ТС в одну из трех очередей сообщений, в зависимости от того легковой это автомобиль, грузовик или мотоцикл. В завершении есть еще три сервиса, которые считывают эти три очереди, проверяют наличие заказа в базе данных и определяют автомобилю парковочное место для транспортировки.
Функция main
выступает больше в качестве функции тестирования, поскольку в настоящей системе микросервисов каждый из них должен иметь собственную функцию main
или быть частью такого фреймворка, как Spring. Однако нам нужно лишь увидеть, что все согласованно работает в одной JVM. Настоящий тест этой системы заключается в наблюдении работоспособности всех сервисов по-отдельности, но я предположу, что это сработает и так либо потребует минимальных корректировок. Вот суть функции main
:
PurchaseOrderConsumer.consume();
CarConsumer.consume();
TruckConsumer.consume();
MotorcycleConsumer.consume();
PurchaseOrderGenerator.start();
Мы стартуем всех потребителей, после чего запускаем генератор заказа на покупку, который создает заказы и поочередно помещает их в очередь в течение 10 секунд. Поскольку ни у одного из классов нет состояния, все их методы могут быть статическими.
Так как все действие начинается в PurchaseOrderGenerator
, первым делом мы заглянем туда. Вот основная часть кода:
Sender sender = RabbitFlux.createSender(soptions);
sender.sendWithPublishConfirms(
Flux.generate((sink) -> sink.next(createRandomPurchaseOrder()))
.cast(PurchaseOrder.class)
.doOnNext((o) -> log("produced: " + o))
.delayElements(Duration.ofMillis(100))
.take(Duration.ofSeconds(10))
.map(i -> new OutboundMessage("",
QUEUE_NAME,
writeJson(i).orElse("").getBytes()))
.doFinally((s) -> {
log("Generator in finally for signal " + s);
sender.close();
})
)
.subscribe();
Мы используем очередь сообщений RabbitMQ, являющуюся реактивной библиотекой Java. Она содержит Sender.sendWithPublishConfirms
, который получает Flux<OutboundMessage>
и возвращает Flux<OutboundMessageResult>
. Flux — это последовательность элементов, к которым можно обращаться по одному. Ее длина может быть бесконечной, и одновременно в процессе задействуется только один элемент, хотя можно задействовать и больше, что определяется рядом факторов.
Flux является ключевым компонентом библиотеки асинхронного ввода/вывода Project Reactor. Он позволяет объединять методы в цепочку, чтобы управлять элементами по ходу их движения в потоке через систему, а также управлять самим этим потоком. Методы, управляющие элементами, обычно получают в качестве параметров лямбда-функции, что позволяет вызывать их для каждого элемента по мере его продвижения через систему. Два самых распространенных метода — это map
и filter
. map
предназначен для преобразования элементов в потоке из одного типа в другой. filter
служит для отбрасывания элементов, не соответствующих определенным критериям.
Нам нужно создать непрерывный список заказов на покупку и представить его методу Sender.sendWithPublishConfirms
как Flux<OutboundMessage>
. Этот процесс мы начинаем со строки Flux.generate((sink) -> sink.next(createRandomPurchaseOrder()))
, которая создает потенциально бесконечный список случайных заказов на покупку. Поскольку Flux.generate
возвращает Flux<Object>
, нам нужен метод cast
, который даст системе понять, что мы работаем над объектами PurchaseOrder
. Flux
-метод doOnNext
представляет удобный способ ввести побочные эффекты. Здесь я его использую в основном для логирования.
Далее метод delayElements
добавляет задержку между каждым элементом. Я делаю это для симуляции появления случайных заказов с разумной скоростью. Можете без проблем его удалить, но тогда вместо выполнения системы в течение десяти секунд и завершения, она будет заполняться очередями сообщений о заказах на покупку, для обработки которых потребуется уже более двух минут. Метод take
позволит Flux выполнять обработку на протяжении десяти секунд и затем останавливаться. Я так делаю, потому что этот генератор служит исключительно для тестирования. В реальной же системе заказы на покупку наверняка будут поступать из API, который будет добавлять их в очередь сообщений.
Чтобы RabbitMQ обрабатывал элементы, сначала их нужно преобразовать в OutBoundMessage
как массив байтов, представляющий сериализацию JSON заказа на покупку. Это делается с помощью метода map
и лямбда-выражения, которое создает нужный OutBoundMessage
. Если в сериализации заказа на покупку возникает ошибка, мы отправляем пустой массив, который на противоположной стороне отфильтруется. Можно отфильтровать его и здесь, чтобы избавить очереди от необходимости передачи недействительных сообщений. Но нам все равно нужна проверка на встречной стороне, и раз уж это всего лишь тестовая схема, то можно отправить и их.
Метод doFinally
вызывает свое лямбда-выражение всего раз, и в данном случае мы используем его для логирования завершения потока и закрытия отправителя. В конце этой цепочки мы получаем Flux<OutboundMessage>
, который подходит для передачи методу Sender.sendWithPublishConfirms
. Назад же мы получаем Flux<OutboundMessageResult>
, который можно проверить, чтобы убедиться в размещении сообщения в соответствующей очереди. В данном простом случае проверку мы не делаем.
Метод subdcribe
, который вызывается для Flux<OutboundMessageResult>
, начинает цепочку. Важно отметить, что до вызова метода subscribe
мы всего лишь выстраиваем цепочку действий. Как только вызывается метод subscribe
, мы больше ничего с потоком данных не делаем, и он возвращает только объект Disposable
, который можно использовать, чтобы остановить подписку. Мы даже не храним объект Disposable
, поскольку наш поток остановится спустя десять секунд, как прописано в цепочке событий. Начальный Flux, который мы создали с помощью метода generate
, не завершается вызовом subscribe
, но ожидает, что Sender
, которому мы его передаем, подпишется на него, чтобы запустить эту цепочку событий.
Прежде чем перейти к реальным сервисам в системе, заглянем за кадр и посмотрим, что всем этим движет. Здесь есть один магический элемент, который в большинстве случаев остается незаметен, а именно Scheduler
. Взгляните на Flux
, созданный generate
. Первые два метода в цепочке, cast
и doOnNex
, содержат лямбда-функции, которые не блокируются и возвращают результат через очень небольшой промежуток времени. Однако следующий метод в цепочке, delayElements
, должен каким-то образом блокироваться, чтобы вносить в поток задержку. Как же нам заблокировать функцию в мире без блокировок? Ответом будет Scheduler
(планировщик). Планировщик может просматривать цепочку, а также проходящие через нее элементы, решая, какой из них и в какой части цепочки должен быть обработан.
У вас голова еще не взорвалась? На это можно посмотреть так. Внутри многозадачной операционной системы нечто наблюдает за ожидающими процессами и решает, какой из них очередным получит внимание ЦПУ. Планировщик работает аналогичным образом, только ему не нужно беспокоиться о прерываниях. Все задачи, с которыми работает планировщик, представляют очень мелкие фрагменты кода. “Блокирующие” задачи вроде delayElements
разбиваются на две части. Первая часть получает элемент из потока, устанавливает таймер и возвращает управление планировщику. Далее планировщик может выполнять какую-нибудь другую задачу. Как только таймер метода delayElements
истекает, вызывается вторая часть и сохраненный элемент получает возможность перемещения в следующий метод цепочки.
Я рассматриваю этот процесс как две непрерывные конвейерные ленты. Первая перемещает элементы по направлению ко второй, но вторая движется намного медленнее. В итоге элементы задерживаются на первой ленте, в то время как вторая их постепенно с нее снимает. По мере накопления задерживающихся элементов возникает так называемое “обратное давление”, и система понимает, что нужно просто остановить первую ленту, пока вторая ее немного не догонит.
Далее рассмотрим второй сервис системы, PurchaseOrderConsumer
. Его задача в извлечении элементов из очереди заказов на покупку, их логировании в базе данных, обработке и последующем отборе для другой очереди. Вот основной код:
Sender sender = RabbitFlux.createSender(soptions);
Receiver receiver = RabbitFlux.createReceiver(roptions);
sender.sendWithPublishConfirms(receiver
.consumeAutoAck(PO_QUEUE_NAME)
.map(d -> readJson(new String(d.getBody())))
.filter(PurchaseOrder::isValid)
.doOnNext(po -> log("recevied po " + po))
.doOnDiscard(PurchaseOrder.class,
po -> log("Discarded invalid PO " + po))
.flatMap(po -> writePoJson(po).map(j -> reactiveCollection
.upsert(po.getId(), j)
.map(result -> po))
.orElse(Mono.just(po)))
.map(po -> factory.build(po.getType(), po))
.timeout(Duration.ofSeconds(10))
.doFinally((s) -> {
log("Consumer in finally for signal " + s);
receiver.close();
sender.close();
cluster.disconnect();
})
.map(v -> new OutboundMessage("",
outQueue.get(v.getPo().getType()),
vehicleWriter
.get(v.getPo().getType())
.apply(v)
.orElse("")
.getBytes()))
)
.subscribe();
Очень похоже на первый сервис. Здесь присутствует внешняя обертка, которая отправляет отобранные объекты по разным очередям сообщений и вызывает subscribe
. Разница в том, что вместо получения Flux
для отправки с помощью метода generate
, мы получаем Flux
от метода Receiver.consumeAutoAck
. Таким образом создается поток элементов, поступающих из очереди сообщения заказов на покупку, после чего мы можем выстраивать для них цепочку обработки. Эта цепочка состоит в основном из обычных методов, map
для преобразования строки JSON в элемент, filter
для удаления недействительных сообщений, doOnNext
и doOnDiscard
для логирования обрабатываемых заказов на покупку или тех, которые были отфильтрованы.
Затем мы переходим к новому методу, flatMap
. Это особый метод, заслуживающий внимания. Те, кому flatMap
знаком из Java Streams, знают, что если трансформация возвращает Stream (или список, который можно легко преобразовать в Stream), то flatMap
изменит внешний Stream, чтобы он включал все элементы внутреннего Stream. Другими словами, он преобразует Stream (поток) потоков в единый поток из всех элементов. В мире Flux происходит то же самое, преобразование Flux (последовательности) последовательностей в единую последовательность со всеми элементами.
В рассматриваемом нами случае преобразование касается метода ReactiveCollection.upsert
, который вставляет заказ на покупку в CouchBase. Возвращаемый тип получается Mono<MutationResult>
. Нам еще нужно поговорить о Mono
, который представляет особый тип Flux
. Работает Mono
во многом аналогично Flux
, но может содержать либо один элемент, либо быть пустым. Также аналогично Flux
содержащийся в нем элемент не материализуется с помощью Mono
, но требует его подождать. Тут нам снова нужно задерживать обработку вложенного элемента, но без блокирования.
Здесь на помощь приходит flatMap
. В этом случае, так как внутренней структурой является Mono
, во внешнем Flux
по-прежнему соблюдается соответствие один к одному. На деле мы отбрасываем результат и просто возвращаем оригинальный заказ на покупку для дальнейшей обработки (как делалось отображением map(result -> po)
). Итак, flatMap
работает аналогично методу delayElements
, о котором говорилось ранее, где он разбивается на две части, между которыми втискивается весь ввод/вывод базы данных. Но планировщик знает, что нужно передать элемент во flatMap
и переходить к обработке другого элемента. Он также не забудет получить возвращаемый элемент базы данных, когда он будет готов к дальнейшей обработке. И снова flatMap
разделяет нашу воображаемую конвейерную ленту на две части.
Осталось проговорить последний метод цепочки, остальные же аналогичны предыдущим. Если бы речь шла о реальном сервисе, то метод timeout
даже не должен был бы находиться в потоке. У нас он присутствует только, чтобы отменять поток, когда в рамках заданного временного промежутка не происходит получение элемента. Это позволяет нашей небольшой игрушечной системе все закрывать и в конечном итоге завершаться.
В реальном сервисе такого бы у вас наверняка не было. Единственное исключение возможно, если сервис ожидает определенное количество элементов в секунду, и в течение нескольких минут не получает ни одного. Тогда вы можете заподозрить, что было потеряно соединение с очередью, и захотите, чтобы служба оркестровки, например Kubernetes, перезапустила ваш сервис.
Еще один вариант применения таймаута — это тестирование кода. Если ваш тест ожидает элемент из очереди сообщений, то вы можете установить таймаут так, чтобы тест не ждал бесконечно.
В системе остался заключительный набор сервисов, считывающий очереди сообщений, в которые PurchaseOrderConsumer
выводит тип Vehicle
. Единственная выполняемая им обработка состоит в считывании заказа на покупку из типа Vehicle
, подтверждении наличия заказа в базе данных, определении для него парковочного места согласно типу Vehicle
и логировании конечного объекта. В реальной системе вы бы наверняка захотели добавить его в учетную базу данных транспортных средств либо использовать какой-либо другой способ отслеживания.
Я создал три класса сервисов, по одному для каждого типа; Car (легковое авто), Truck (грузовик) или Motorcycle (мотоцикл). В примере текущего кода я мог сделать один класс с рядом параметров и запустить метод получения три раза, по одному для каждого типа. Но я оставил их отдельными, так как предвидел, что для каждого типа потребуется своя логика, кардинальность типов окажется очень низка, а базовый код короче сотни строк. Если бы у сервисов было много общей логики, я бы не пожалел лишнего времени на их объединение. Но мой прагматизм иногда превосходит перфекционизм, и мне это нравится.
Чтобы все проверить, пришлось запустить CoucheBase и RabbitMQ. В обоих случаях я просто создал экземпляр Docker:
docker run -d --hostname my-rabbit -p 5672:5673 rabbitmq:3
docker run -d -p 8091-8094:8091-8094 -p 11210:11210 couchbase
В случае с CouchBase мне пришлось открыть консоль администратора по адресу http://localhost:8091, прокликать пользовательское соглашение и настроить корзину po
. После этого я смог запустить сервисы и наблюдать журнал:
...
pool-10-thread-20 recevied po PurchaseOrder(id=e186589fd87279f3, price=78023.46, type=Motorcycle, time=2021-01-05T15:43:10.704386Z)
cb-io-kv-17-2 po for motorcycle e186589fd87279f3 confirmed
cb-io-kv-17-2 received motorcycle Vehicle(po=PurchaseOrder(id=e186589fd87279f3, price=78023.46, type=Motorcycle, time=2021-01-05T15:43:10.704386Z), lot=motorcycle lot a)
parallel-8 Generator in finally for signal onComplete
parallel-13 Car consumer in finally for signal cancel
parallel-14 Truck consumer in finally for signal cancel
parallel-7 Motorcycle consumer in finally for signal cancel
parallel-4 Consumer in finally for signal onError
------------------------------------------------------------------------
BUILD SUCCESS
------------------------------------------------------------------------
Total time: 25.775 s
Finished at: 2021-01-05T07:43:23-08:00
------------------------------------------------------------------------
Сборка прошла успешно! Если вы соберете и запустите систему, то советую внимательно проследить за логами, все из которых выводят текущий поток. Вскоре вы заметите паттерн маленьких конвейерных лент (некоторые будут выполняться параллельно), о котором я говорил.
rkamradt/usedvehicles
Contribute to rkamradt/usedvehicles development by creating an account on GitHub.github.com
Читайте также:
- Асинхронность в Java
- Тестирование уровня данных в Android Room с помощью Rxjava, LiveData и…
- Введение в байт-код Java
Читайте нас в Telegram, VK и Яндекс.Дзен
Перевод статьи Randal Kamradt Sr: Understanding Reactive Java