Что такое «событийно-ориентированная архитектура»?
Событийно-ориентированная архитектура — это шаблон проектирования с применением событий для запуска и передачи изменений между компонентами системы. Службы здесь взаимодействуют, обмениваясь событиями, то есть сообщениями о наступлении события или изменении состояния.
Преимущества событийно-ориентированной архитектуры
- Слабая связанность: службы разделены, чем обеспечивается независимость и модульность.
- Масштабируемость: службы масштабируются независимо, исходя из потребности в событиях, чем повышается общая производительность системы.
- Асинхронная обработка: событиями обеспечиваются асинхронное взаимодействие, уменьшение задержки и времени отклика.
- Порождение событий: поддерживается естественным образом, ведь состояние системы определяется последовательностью прошлых событий.
- Гибкость: добавление новых служб или изменение имеющихся не сказывается на всей системе.
Kafka
Apache Kafka — это распределенная потоковая платформа для создания конвейеров данных в реальном времени и потоковых приложений, потоковой передачи событий с высокой пропускной способностью, отказоустойчивостью и масштабируемостью.
В основе Kafka — модель обмена сообщениями «публикация-подписка», где в темах отправителями публикуются сообщения-события, а получатели — для получения и обработки сообщений — подписываются на эти темы. События хранятся в Kafka неизменяемо и только с возможностью добавления. Данные обрабатываются как в реальном времени, так и ретроспективе.
Ключевые понятия Kafka
- Темы — каналы для публикации событий и подписки на них.
- Отправители — службы, которыми события создаются и отправляются в темы Kafka.
- Получатели — службы, которые подписываются на темы, ими обрабатываются входящие события.
- Разделы: каждая тема разбивается на разделы, чем обеспечиваются параллельная обработка и распределение нагрузки.
Роль Kafka в событийно-ориентированной архитектуре
Эта роль центральная, Apache Kafka — масштабируемая и долговечная шина событий для взаимодействия микросервисов. В событийно-ориентированной архитектуре микросервисы предназначены для отправки и получения событий, чем обеспечивается их асинхронное взаимодействие без прямых зависимостей одного от другого.
Ключевые роли
- Брокер событий. Kafka — центральный брокер событий, легко масштабируемое и надежное промежуточное ПО для обработки потоков событий. Им эффективно контролируются маршрутизация, хранение, распределение событий между службами.
- Журнал событий. В журналоподобном хранилище Kafka с возможностью только добавлять события долговременно сохраняются и остаются на настраиваемый период. Это идеальный источник истины для сценариев порождения событий, ведения контроля и воспроизведения данных.
- Разделение служб. В Kafka службы разделяются, поэтому при публикации событий отправителям не нужно «знать», в каких именно службах их сообщения получают. Получателям тоже не нужно «знать» об отправителях, где генерируются события.
- Надежность и отказоустойчивость. В Kafka надежно гарантируются долговечность данных и отказоустойчивость. События реплицируются в нескольких брокерах, чем обеспечивается высокая доступность даже в случае сбоев узлов.
- Масштабируемость. Распределенной архитектурой Kafka обеспечивается горизонтальное масштабирование за счет добавления брокеров в кластер. Так с высокой пропускной способностью обрабатываются потоки событий между многочисленными отправителями и получателями.
- Обработка данных в реальном времени обеспечивается предоставлением событий с низкой задержкой. Поэтому Kafka хороша для сценариев, где важна своевременная, немедленная обработка событий.
- Упорядочение событий по времени. В Kafka сохраняется последовательность событий каждого раздела. Это важно для поддержания согласованности при обработке событий в нескольких службах.
- Эволюция схемы и совместимость. В Kafka поддерживается сериализация схемы с фреймворками вроде Avro или Protobuf, чем обеспечиваются эволюция схемы, прямая и обратная совместимость, когда службы со временем развиваются.
Настройка среды
Spring Boot — популярный Java-фреймворк для создания микросервисов. За счет различных утилит и соглашений им упрощается процесс разработки с акцентом скорее на бизнес-логике, чем шаблонном коде. А обширной экосистемой Spring Boot облегчается интеграция с другими технологиями.
Начнем с создания простого проекта Spring Boot с интеграцией Kafka, в проекте же настроим необходимые зависимости.
Создаем новый проект Spring Boot в любимой среде IDE, добавляем в файл pom.xml
или build.gradle
зависимости для Maven или Gradle:
<!-- Для Maven -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
// Для Gradle
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.kafka:spring-kafka'
Прежде чем применять Kafka для событийно-ориентированного взаимодействия, создадим темы и укажем количество разделов, темы подобны каналам для публикации событий отправителями и подписки получателей на эти события:
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class KafkaTopicConfiguration {
@Bean
public NewTopic exampleTopic() {
return new NewTopic("example_topic", 1, (short) 1);
}
}
В этом примере классом @Configuration
определяется тема Kafka example_topic
с одним разделом и одним коэффициентом репликации, которым обеспечивается отказоустойчивость за счет репликации данных в нескольких брокерах.
Теперь продемонстрируем интеграцию Spring Boot с Kafka, создав простой проект с базовыми отправителем и получателем:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
private final String topic = "example_topic";
@Autowired
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message) {
kafkaTemplate.send(topic, message);
}
}
Здесь для отправки сообщений-событий в тему Kafka example_topic
создан класс KafkaProducerService
. Сообщения публикуем с помощью KafkaTemplate
из Spring Kafka.
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "example_topic", groupId = "example_group")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
}
Класс KafkaConsumerService
, которым прослушивается example_topic
, принадлежит группе получателей example_group
. Всякий раз, когда сообщение публикуется в теме, для обработки входящего события вызывается метод receiveMessage
.
Реальные примеры
Обработка заказов
В реальной системе обработки заказов поток событий такой:
- Заказ размещен.
- Платеж обработан.
- Запасы обновлены.
В этом сценарии каждое событие обрабатывается разными микросервисами, которые для поддержания согласованности и разделения служб взаимодействуют через Kafka.
Этап 1. Настройка среды Kafka
Запускаем требуемый Kafka сервер ZooKeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
Запускаем сервер Kafka:
bin/kafka-server-start.sh config/server.properties
Создаем две темы Kafka для событий заказа и платежных событий:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic order_events
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic payment_events
Создаем новый проект Spring Boot в любимой среде IDE, добавляем в файл pom.xml
зависимости:
<dependencies>
<!-- Spring Boot Starter Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Boot Starter для Apache Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
Для взаимодействия служб создаем два класса с данными о событиях OrderEvent
и PaymentEvent
:
public class OrderEvent {
private Long orderId;
// Другие поля, конструкторы, геттеры, сеттеры
}
public class PaymentEvent {
private Long orderId;
private BigDecimal amount;
// Другие поля, конструкторы, геттеры, сеттеры
}
Создаем две службы-отправителя Kafka — одну для событий заказа, другую для платежных событий:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class OrderEventProducer {
private static final String TOPIC = "order_events";
@Autowired
private KafkaTemplate<String, OrderEvent> kafkaTemplate;
public void sendOrderEvent(OrderEvent orderEvent) {
kafkaTemplate.send(TOPIC, orderEvent);
}
}
@Service
public class PaymentEventProducer {
private static final String TOPIC = "payment_events";
@Autowired
private KafkaTemplate<String, PaymentEvent> kafkaTemplate;
public void sendPaymentEvent(PaymentEvent paymentEvent) {
kafkaTemplate.send(TOPIC, paymentEvent);
}
}
Создаем две службы-получателя Kafka — одну для событий заказа, другую для платежных событий:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class OrderEventConsumer {
@KafkaListener(topics = "order_events", groupId = "order_group")
public void handleOrderEvent(OrderEvent orderEvent) {
// Обрабатываем событие заказа, например сохраняем его в базе данных
System.out.println("Received Order Event: " + orderEvent);
// Реализуем логику обработки заказов и обновления запасов
}
}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class PaymentEventConsumer {
@KafkaListener(topics = "payment_events", groupId = "payment_group")
public void handlePaymentEvent(PaymentEvent paymentEvent) {
// Обрабатываем платежное событие, например обновляем статус платежа
System.out.println("Received Payment Event: " + paymentEvent);
// Реализуем логику обработки платежа и обновления статуса заказа
}
}
Чтобы получать данные о заказах и платежах клиентов, создаем конечные точки REST, по получении данных отправляем события в Kafka:
@RestController
public class OrderController {
@Autowired
private OrderEventProducer orderEventProducer;
@PostMapping("/orders")
public ResponseEntity<String> createOrder(@RequestBody OrderEvent orderEvent) {
// Полученное событие заказа обрабатываем и отправляем в Kafka
orderEventProducer.sendOrderEvent(orderEvent);
return ResponseEntity.ok("Order created successfully");
}
}
@RestController
public class PaymentController {
@Autowired
private PaymentEventProducer paymentEventProducer;
@PostMapping("/payments")
public ResponseEntity<String> processPayment(@RequestBody PaymentEvent paymentEvent) {
// Полученное платежное событие обрабатываем и отправляем в Kafka
paymentEventProducer.sendPaymentEvent(paymentEvent);
return ResponseEntity.ok("Payment processed successfully");
}
}
Запускаем приложение Spring Boot и с помощью клиента REST, например Postman, создаем заказы и обрабатываем платежи. Просматриваем в журналах события, полученные службами-получателями, и выполняемую логику обработки заказов и обновления запасов.
Имейте в виду, что это упрощенный пример: в реальном приложении добавляются проверки, обработка ошибок, дополнительные службы для обработки заказов, платежных шлюзов, учета запасов.
Интернет вещей и обработка сигналов датчиков
Создадим приложение для Интернета вещей и обработки сигналов датчиков со Spring Boot и Kafka. Здесь упор делается на сборе сигналов датчиков, их агрегировании и генерировании предупреждений на основе условий. Опять же, пример упрощенный: в реальном сценарии добавляются сложность и обработка данных.
Для взаимодействия служб создаем класс с данными о событии сигналов датчиков SensorDataEvent
:
public class SensorDataEvent {
private String sensorId;
private double value;
// Другие поля, конструкторы, геттеры, сеттеры
}
Чтобы отправлять события сигналов датчиков, создаем службу-отправитель Kafka:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class SensorDataEventProducer {
private static final String TOPIC = "sensor_data_events";
@Autowired
private KafkaTemplate<String, SensorDataEvent> kafkaTemplate;
public void sendSensorDataEvent(SensorDataEvent sensorDataEvent) {
kafkaTemplate.send(TOPIC, sensorDataEvent);
}
}
Чтобы получать события сигналов датчиков, создаем службу-получателя Kafka и агрегатор для агрегирования данных:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class SensorDataEventConsumer {
@Autowired
private DataAggregator dataAggregator;
@KafkaListener(topics = "sensor_data_events", groupId = "sensor_data_group")
public void handleSensorDataEvent(SensorDataEvent sensorDataEvent) {
// Полученное событие сигналов датчиков обрабатываем и передаем агрегатору
dataAggregator.aggregate(sensorDataEvent);
}
}
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
public class DataAggregator {
private final Map<String, Double> sensorDataMap = new HashMap<>();
public void aggregate(SensorDataEvent sensorDataEvent) {
// Агрегирование данных датчика по его идентификатору
double currentValue = sensorDataMap.getOrDefault(sensorDataEvent.getSensorId(), 0.0);
double aggregatedValue = currentValue + sensorDataEvent.getValue();
sensorDataMap.put(sensorDataEvent.getSensorId(), aggregatedValue);
// Проверяем, превышен ли агрегированным значением определенный порог, и генерируем предупреждение
if (aggregatedValue > 1000) {
generateAlert(sensorDataEvent.getSensorId(), aggregatedValue);
}
}
private void generateAlert(String sensorId, double aggregatedValue) {
System.out.println("ALERT! Sensor ID: " + sensorId + ", Aggregated Value: " + aggregatedValue);
// Реализуем логику генерирования предупреждений, например отправки уведомлений или запуска действий
}
}
Чтобы получать данные датчика с устройств Интернета вещей, создаем конечные точки REST, по получении данных отправляем события в Kafka для дальнейшей обработки и агрегирования:
@RestController
public class SensorDataController {
@Autowired
private SensorDataEventProducer sensorDataEventProducer;
@PostMapping("/sensors/{sensorId}/data")
public ResponseEntity<String> sendSensorData(@PathVariable String sensorId, @RequestBody double value) {
// Создаем и отправляем «SensorDataEvent» в Kafka
SensorDataEvent sensorDataEvent = new SensorDataEvent(sensorId, value);
sensorDataEventProducer.sendSensorDataEvent(sensorDataEvent);
return ResponseEntity.ok("Sensor data received successfully");
}
}
Запускаем Spring Boot и с помощью клиента REST, например Postman, тестируем приложение.
Обработка ошибок и повторные попытки
Обработка ошибок и повторные попытки — необходимые компоненты надежной, безотказной программной системы. Они важны для корректного восстановления приложений после неожиданных сбоев, переходных ошибок или отказа внешних служб. Обработка ошибок — это процесс устранения неожиданных или исключительных ситуаций, которые случаются при выполнении программы.
Повторные попытки — механизм многократного автоматического повторного выполнения неудачной операции или запроса. Их цель — обработка переходных ошибок, а они временные и устраняются повторением операции через небольшой интервал. Повторные попытки приходятся кстати при периодических сбоях ненадежных сетевых подключений или внешних служб.
Повторные попытки реализуются разными стратегиями.
Фиксированный повтор: операция повторяется фиксированное количество раз с постоянным интервалом между повторами.
Экспоненциальная задержка: во избежание перегрузки системы, если проблема сохраняется, интервал между повторами увеличивается экспоненциально.
Случайная задержка: чтобы рассредоточить повторные попытки и избежать коллизий запросов, в интервал между повторами вводится элемент случайности.
Выключатель: после определенного количества последовательных сбоев активируется выключатель, и повторные попытки прекращаются. После периода «охлаждения» попытки возобновляются.
Повтор с джиттером: чтобы уменьшить вероятность одновременных попыток нескольких клиентов, фиксированная или экспоненциальная задержка комбинируется с джиттером, случайным интервалом.
Реализуем повторные попытки обработки событий с помощью Spring-Retry. Сначала добавим в pom.xml
или build.gradle
зависимость Spring-Retry:
<!-- Для Maven -->
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>
// Для Gradle
implementation 'org.springframework.retry:spring-retry'
Чтобы в случае сбоя обработка повторилась, обновляем получателя:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@Retryable(
value = { Exception.class },
maxAttempts = 3,
backoff = @Backoff(delay = 1000, maxDelay = 3000)
)
@KafkaListener(topics = "example_topic", groupId = "example_group")
public void receiveMessage(String message) {
try {
// Обрабатываем сообщение здесь
System.out.println("Received message: " + message);
} catch (Exception e) {
throw new RuntimeException("Error processing message: " + message, e);
}
}
}
В этом примере в метод receiveMessage
добавили аннотацию @Retryable
, указав максимум повторов — три — и интервал между ними — 1 сек. при максимуме в 3 сек.
Очередь недоставленных сообщений
Это очередь, в которой необработанные события отправляются на дальнейший анализ и отладку. Такая их проверка не сказывается на основном потоке обработки событий.
Чтобы переместить проблемные события в очередь недоставленных сообщений, настраиваем для них в Kafka отдельную тему:
@Configuration
public class KafkaTopicConfiguration {
@Bean
public NewTopic exampleTopic() {
return new NewTopic("example_topic", 1, (short) 1);
}
@Bean
public NewTopic deadLetterTopic() {
return new NewTopic("example_topic_dead_letter", 1, (short) 1);
}
}
Обновляя получателя, отправляем эти события в очередь недоставленных сообщений:
@Service
public class KafkaConsumerService {
@Retryable(
value = { Exception.class },
maxAttempts = 3,
backoff = @Backoff(delay = 1000, maxDelay = 3000)
)
@KafkaListener(topics = "example_topic", groupId = "example_group")
public void receiveMessage(String message) {
try {
// Обрабатываем сообщение здесь
System.out.println("Received message: " + message);
} catch (Exception e) {
// Перемещаем проблемное событие в очередь недоставленных сообщений
kafkaProducerService.sendMessageToDeadLetterTopic(message);
}
}
}
Здесь, чтобы отправить необработанные сообщения в тему example_topic_dead_letter
, в KafkaProducerService
добавили метод sendMessageToDeadLetterTopic
.
Благодаря этим совершенствованиям конвейер обработки событий отказоустойчивее, с временными сбоями он справляется корректнее.
Масштабирование событийно-ориентированных микросервисов
Получатели и отправители в Kafka масштабируются увеличением числа экземпляров соответственных служб. Запуская несколько экземпляров получателей параллельно, мы обрабатываем события одновременно, повышая общую пропускную способность.
При масштабировании получателей важно учитывать разбиение на разделы. Разделы — это единица параллелизма Kafka, каждый раздел получается только одним получателем группы получателей. Поэтому для эффективной параллельной обработки должно быть достаточно разделов.
Группа получателей — получатели с одним идентификатором группы, ими распределяется нагрузка получения событий из одной и той же темы. Чтобы увеличить параллелизм, получатели в группу добавляются.
Пример
Чтобы масштабировать получателей, для каждого их экземпляра задаем уникальный instanceId
и добавляем в группу получателей несколько экземпляров одного получателя:
@Service
public class KafkaConsumerService {
@Value("${kafka.instanceId}")
private String instanceId;
@KafkaListener(
topicPartitions = @TopicPartition(
topic = "example_topic",
partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "0"),
@PartitionOffset(partition = "1", initialOffset = "0"),
// При необходимости добавляем разделы и смещения
}
),
groupId = "example_group"
)
public void receiveMessage(String message) {
System.out.println("[" + instanceId + "] Received message: " + message);
}
}
В этом примере разделы и их начальные смещения для этого экземпляра получателя указаны аннотацией @KafkaListener
с topicPartitions
. Каждым экземпляром получателя обрабатываются события разных разделов, чем увеличивается параллелизм.
groupId
: этим атрибутом указывается группа получателей, к которой относится получатель. Нагрузка получения сообщений из разделов темы распределяется между получателями одной группы.
Чтобы создать несколько экземпляров службы получателей, запускаем приложение с разными значениями instanceId
при помощи профилей Spring:
@SpringBootApplication
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, "--spring.profiles.active=instance1");
SpringApplication.run(KafkaApplication.class, "--spring.profiles.active=instance2");
// При необходимости добавляем экземпляры
}
}
Здесь с профилями instance1 и instance2 запущено два экземпляра приложения, у каждого из которых уникальный instanceId
. События получаются ими из разных разделов, чем повышается параллелизм.
Количество разделов example_topic
увеличиваем в KafkaTopicConfiguration
:
@Configuration
public class KafkaTopicConfiguration {
@Bean
public NewTopic exampleTopic() {
return new NewTopic("example_topic", 4, (short) 1);
}
@Bean
public NewTopic deadLetterTopic() {
return new NewTopic("example_topic_dead_letter", 1, (short) 1);
}
}
В этом примере количество разделов example_topic
увеличили до четырех, усовершенствовав распределение нагрузки и параллельную обработку.
Запуская несколько экземпляров получателей и увеличивая количество разделов, мы эффективно масштабируем событийно-ориентированные микросервисы для работы с более высокой пропускной способностью.
Заключение
Архитектура событийно-ориентированных микросервисов со Spring Boot и Kafka — это мощный, масштабируемый подход к построению современных распределенных систем. Используя преимущества событийно-ориентированного взаимодействия, слабой связанности и асинхронной обработки, разработчики создают гибкие, отказоустойчивые, адаптивные приложения под требования современного динамичного делового ландшафта.
Читайте также:
- Как интегрировать Kafka со Spring Boot
- Кэширование Redis для максимальной производительности в Spring Boot и Java
- Внешнее конфигурирование базы данных Spring Boot с помощью AWS Secrets Manager
Читайте нас в Telegram, VK и Дзен
Перевод статьи Bubu Tripathy: Event-Driven Architecture