В этой статье будет рассматриваться парадигма программирования, связанная с обработкой неблокирующих асинхронных событий.
Что такое неблокирующие асинхронные вызовы?
Допустим, у нас есть API, и мы обрабатываем большое количество запросов. По своей архитектуре синхронные вызовы должны быть блокирующими. Это означает, что пока мы не получим ответ от сервера, выполнение процесса в приложении будет заблокировано. Напротив, в случае асинхронной обработки вызовов, приложение не дожидается ответа от сервера и, следовательно, является неблокирующим.
Понимая, что асинхронные вызовы являются неблокирующими по своей природе, посмотрим, как реактивное программирование помогает обрабатывать асинхронные события в приложении.
Что такое события?
Термин “событие” имеет различные определения в зависимости от контекста. События в реактивном программировании можно описать как наблюдаемые происходящие с течением времени явления, на которые реагирует приложение.
Как уже было сказано, реактивное программирование помогает справляться с событиями таким образом, чтобы приложение оставалось отзывчивым и масштабируемым.
Основные рабочие механизмы
Одной из основных целей реактивного программирования является предоставление возможности более эффективной и масштабируемой обработки асинхронных потоков данных и событий.
Состояние приложения в реактивном программировании строится на основе потоков данных, которые образуются из событий, происходящих с течением времени. Изменения в состоянии или событиях могут обрабатываться приложением асинхронно и соответствующим образом обновляться.
Реактивное программирование также предполагает работу с противодавлением, которое представляет собой механизм управления потоком данных в приложении для предотвращения перегрузки и обеспечения стабильности. Это особенно важно при работе с большими объемами потоков данных и событий, характерных для больших приложений.
Как работает реактивное программирование?
Для эффективной обработки данных и событий реактивное программирование использует издателей и подписчиков.
Модель “издатель-подписчик” применяется в реактивном программировании следующим образом:
- Инициализируется объект-издатель, который с течением времени выдает поток данных или событий.
- Создается один или несколько объектов-подписчиков, которые регистрируются у издателя и получают уведомления о появлении новых данных. Подписчики могут обрабатывать данные асинхронно и соответствующим образом обновлять состояние приложения. Можем настроить так, чтобы событие потреблялось одним или несколькими подписчиками в зависимости от конкретного случая использования.
- Обеспечивается поддержка операторов — функций, используемых для преобразования и объединения потоков данных. Операторы позволяют разработчикам создавать сложные потоки данных путем компоновки и преобразования потоков данных с применением набора приемов функционального программирования.
- Для обеспечения масштабируемости используются планировщики, которые помогают управлять многозадачностью и параллелизмом в приложении. Планировщики позволяют разработчикам контролировать обработку потоков данных и отвечают за отзывчивость и масштабируемость приложения.
Абстракции для реактивного программирования предоставляют различные фреймворки, такие как RxJava, Reactor, Akka и Spring WebFlux.
В этой статье рассмотрим один из них с пошаговым руководством по использованию.
Reactor
Reactor — это библиотека реактивного программирования для создания асинхронных и событийно-управляемых приложений на языке Java. Она предназначена для масштабируемой и эффективной обработки больших объемов потоков данных и событий. Я сам пользовался Reactor, и поэтому решил предложить вам эту простую и доступную для новичков библиотеку в демонстрационных целях.
Основное назначение Reactor — предоставление набора инструментов и абстракций для создания реактивных приложений. Reactor предлагает набор основных компонентов, таких как Flux и Mono, которые представляют собой потоки данных и событий. Эти компоненты могут быть объединены и преобразованы с помощью набора операторов, таких как map, filter и reduce, для создания сложных потоков данных.
Прежде чем продолжить, выясним, что такое Mono и Flux.
Сравнение Mono и Flux
Как уже говорилось, реактивное программирование работает по модели “издатель-подписчик”. И Mono, и Flux предоставляют абстракцию для публикации событий.
Разница между ними заключается в том, что Mono может публиковать только одно событие за раз, а Flux — несколько событий за раз. Поэтому когда нужно публиковать только одно событие, выбирают Mono, а когда нужно публиковать несколько событий — Flux.
Теперь приведу небольшой пример, поясняющий каждую упоминаемую ранее концепцию.
Демо
Все начинается с того, что издатель выдает событие для потребителя.
Как и java-потоки, Flux и Mono придерживаются ленивого выполнения, т.е. значение выражения не обрабатывается до тех пор, пока в нем нет необходимости.
Начнем с создания простого Flux.
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);
Метод “just” позволяет создать Flux, если есть доступное значение. Это самый простой способ создания Flux. Несмотря на то что я привожу пример с Flux, шаги остаются такими же и для Mono. В Mono также используется метод “just” с той разницей, что Mono применяется только для одного элемента. Поэтому если задать несколько значений, то получим ошибку.
Flux<String> mappedFlux = flux.map(i -> "Number: " + i)
The above flux gets converted to
"Number: 1", "Number: 2", "Number: 3", "Number: 4", "Number: 5"
Здесь был использован оператор для обработки данных. Как вы помните, я уже упоминал об операторах. Теперь хотел бы перечислить несколько распространенных операторов.
- Map: преобразует каждый элемент, выдаваемый потоком данных, применяя к нему функцию.
- Filter: фильтрует элементы, выдаваемые потоком данных, на основе предикатной функции.
- Reduce: агрегирует элементы, выдаваемые потоком данных, в одно значение с помощью функции-аккумулятора.
- Merge: объединяет несколько потоков данных в один.
- Concat: конкатенирует нескольких потоков данных в один в определенном порядке.
- FlatMap: преобразует каждый элемент, выдаваемый потоком данных, в другой поток данных, а затем сводит полученные потоки в один.
- Zip: объединяет элементы, выдаваемые несколькими потоками данных, в один элемент с помощью функции-комбинатора.
Применим еще несколько операторов для пояснения их работы.
Flux<String> filteredFlux = mappedFlux.filter(s -> !s.endsWith("2") && !s.endsWith("4"));
Flux преобразуется в:
"Number: 1", "Number: 3", "Number: 5"
Теперь сведем Flux к одной строке.
Mono<String> reducedMono = filteredFlux.reduce("", (a, b) -> a+ ", " + b);
Flux преобразуется в:
"Number: 1, Number: 3, Number: 5"
Эти шаги представляют собой обработку события. Мы начали с исходного события Flux, состоящего из списка чисел, и использовали операторы для обработки событий.
Следует отметить, что я добавил преобразованный вывод для пояснения, а на самом деле никаких вычислений не происходит. Как уже говорилось, Flux следует модели ленивого исполнения. Пока мы не подпишемся на поток событий, никаких вычислений не происходит. А когда подписываемся, получаем возможность контролировать то, что нужно сделать с полученным событием.
Чтобы упростить задачу и показать, как работает подписчик, просто выведу Flux.
reducedMono.subscribe(System.out::println);
Вывод:
"Number: 1, Number: 3, Number: 5"
Это очень простой пример того, как работает Flux. Мы создали событие, обработали его с помощью операторов и, использовав метод subscribe, выполнили оценку этого выражения.
Вы можете спросить: а где же здесь планировщики? Перейдем к обсуждению этого вопроса.
Планировщики
Планировщики используются для управления многозадачностью и параллелизмом в приложении. Они позволяют разработчикам контролировать процесс обработки потоков данных и обеспечивают отзывчивость и масштабируемость приложения.
Планировщики позволяют указать, где и как должна происходить обработка потока данных. Они могут использоваться для запуска задач в отдельном потоке, в пуле потоков или на конкретном исполнителе. Планировщики также могут использоваться для управления порядком выполнения задач, введения задержек и тайм-аутов.
В Reactor предусмотрено несколько встроенных планировщиков для управления обработкой потоков данных.
- Schedulers.immediate(): запускает задачи в текущем потоке.
- Schedulers.single(): запускает задачи в одном потоке.
- Schedulers.parallel(): запускает задачи на пуле потоков фиксированного размера.
- Schedulers.elastic(): запускает задачи на неограниченном пуле потоков, который может динамически увеличиваться или уменьшаться.
Они могут использоваться с такими методами, как subscribeOn
и publishOn
, для определения того, где будут оцениваться события.
Если в приведенном выше случае необходимо добавить планировщика, то это можно сделать, обновив вызов subscribe.
reducedMono.subscribeOn(Schedulers.parallel()).subscribe(System.out::println);
В данном примере subscribeOn
используется как оператор для подписки на сокращенное Mono на отдельном планировщике. Мы применяем метод Schedulers.Parallel
для создания параллельного планировщика, который может работать с несколькими потоками. Это позволяет обрабатывать поток данных асинхронно и повысить производительность.
Выходные данные остаются неизменными, поскольку мы не меняли часть обработки.
Итак, мы рассмотрели публикацию и подписку, узнали о планировщиках, имеющихся в Reactor.
Не упустили ли чего-нибудь?
Действительно, упустили. Осталось поговорить об обработке противодавления.
Стратегия работы с противодавлением
В Reactor обработка противодавления — это механизм управления потоком данных в приложении для предотвращения перегрузки и обеспечения стабильности. Это особенно важно при работе с большими объемами потоков данных и событий, о чем мы уже говорили. Теперь посмотрим реализацию обработки противодавления в коде.
В Reactor обработка противодавления может быть реализована с помощью различных встроенных стратегий. В этом случае мы прибегаем к onBackPressure___
.
Вместо ___
используем имя стратегии. Вот несколько стратегий, которые поддерживает Reactor.
- Buffer. Если происходит избыток событий, которые система не может обработать, они добавляются в буфер и будут обработаны, как только система станет доступной. Эта стратегия применяется по умолчанию.
- Drop. Отбрасывает поступающие данные, когда подписчик не в состоянии справиться со скоростью производства данных. Таким образом, можно потерять события, которые приходят быстрее, чем совершается скорость обработки.
- Error. Эта стратегия просто возвращает ошибку, если не может обработать входящий запрос.
- Latest. Сохраняется только последний элемент, выпущенный издателем, когда подписчик не успевает за скоростью производства данных. Это аналогично стратегии “Drop”, но здесь сохраняются только последние приходящие данные, а не старые.
Рассмотрим на примере.
Сначала создадим Flux.
Flux<Integer> flux = Flux.range(1, 1000);
Здесь не использовано “just”. Мы применили метод range для создания потока целых чисел в виде Flux.
Для данного примера воспользуемся стратегией Drop
.
Flux<Integer> droppedFlux = flux.onBackpressureDrop().onBackpressureDrop(10);
Здесь мы применили стратегию Drop для обработки противодавления путем сброса самых старых элементов в буфер, когда подписчик не успевает за скоростью производства данных. Размер буфера установили равным 10.
После определения стратегии для данного случая можно перейти к использованию упоминаемых ранее операторов.
Заключение
В этой статье мы обсудили, что такое реактивное программирование и как его использовать. Чтобы не перегрузить вас информацией, я ограничился базовым примером, хотя у Reactor неизмеримо больше возможностей.
Читайте также:
- Реактивное программирование или сопрограммы: между молотом и наковальней?
- Осваиваем реактивное программирование на Java
- Запуск тестовых сценариев с Maven
Читайте нас в Telegram, VK и Дзен
Перевод статьи Arjun Chauhan: Getting Started with Reactive Programming