Событийно-ориентированная архитектура

Что такое «событийно-ориентированная архитектура»?

Событийно-ориентированная архитектура  —  это шаблон проектирования с применением событий для запуска и передачи изменений между компонентами системы. Службы здесь взаимодействуют, обмениваясь событиями, то есть сообщениями о наступлении события или изменении состояния.

Преимущества событийно-ориентированной архитектуры

  • Слабая связанность: службы разделены, чем обеспечивается независимость и модульность.
  • Масштабируемость: службы масштабируются независимо, исходя из потребности в событиях, чем повышается общая производительность системы.
  • Асинхронная обработка: событиями обеспечиваются асинхронное взаимодействие, уменьшение задержки и времени отклика.
  • Порождение событий: поддерживается естественным образом, ведь состояние системы определяется последовательностью прошлых событий.
  • Гибкость: добавление новых служб или изменение имеющихся не сказывается на всей системе.

Kafka

Apache Kafka  —  это распределенная потоковая платформа для создания конвейеров данных в реальном времени и потоковых приложений, потоковой передачи событий с высокой пропускной способностью, отказоустойчивостью и масштабируемостью.

В основе Kafka  —  модель обмена сообщениями «публикация-подписка», где в темах отправителями публикуются сообщения-события, а получатели  —  для получения и обработки сообщений  —  подписываются на эти темы. События хранятся в Kafka неизменяемо и только с возможностью добавления. Данные обрабатываются как в реальном времени, так и ретроспективе.

Ключевые понятия Kafka

  • Темы  —  каналы для публикации событий и подписки на них.
  • Отправители  —  службы, которыми события создаются и отправляются в темы Kafka.
  • Получатели  —  службы, которые подписываются на темы, ими обрабатываются входящие события.
  • Разделы: каждая тема разбивается на разделы, чем обеспечиваются параллельная обработка и распределение нагрузки.

Роль Kafka в событийно-ориентированной архитектуре

Эта роль центральная, Apache Kafka  —  масштабируемая и долговечная шина событий для взаимодействия микросервисов. В событийно-ориентированной архитектуре микросервисы предназначены для отправки и получения событий, чем обеспечивается их асинхронное взаимодействие без прямых зависимостей одного от другого.

Ключевые роли

  1. Брокер событий. Kafka  —  центральный брокер событий, легко масштабируемое и надежное промежуточное ПО для обработки потоков событий. Им эффективно контролируются маршрутизация, хранение, распределение событий между службами.
  2. Журнал событий. В журналоподобном хранилище Kafka с возможностью только добавлять события долговременно сохраняются и остаются на настраиваемый период. Это идеальный источник истины для сценариев порождения событий, ведения контроля и воспроизведения данных.
  3. Разделение служб. В Kafka службы разделяются, поэтому при публикации событий отправителям не нужно «знать», в каких именно службах их сообщения получают. Получателям тоже не нужно «знать» об отправителях, где генерируются события.
  4. Надежность и отказоустойчивость. В Kafka надежно гарантируются долговечность данных и отказоустойчивость. События реплицируются в нескольких брокерах, чем обеспечивается высокая доступность даже в случае сбоев узлов.
  5. Масштабируемость. Распределенной архитектурой Kafka обеспечивается горизонтальное масштабирование за счет добавления брокеров в кластер. Так с высокой пропускной способностью обрабатываются потоки событий между многочисленными отправителями и получателями.
  6. Обработка данных в реальном времени обеспечивается предоставлением событий с низкой задержкой. Поэтому Kafka хороша для сценариев, где важна своевременная, немедленная обработка событий.
  7. Упорядочение событий по времени. В Kafka сохраняется последовательность событий каждого раздела. Это важно для поддержания согласованности при обработке событий в нескольких службах.
  8. Эволюция схемы и совместимость. В Kafka поддерживается сериализация схемы с фреймворками вроде Avro или Protobuf, чем обеспечиваются эволюция схемы, прямая и обратная совместимость, когда службы со временем развиваются.

Настройка среды

Spring Boot  —  популярный Java-фреймворк для создания микросервисов. За счет различных утилит и соглашений им упрощается процесс разработки с акцентом скорее на бизнес-логике, чем шаблонном коде. А обширной экосистемой Spring Boot облегчается интеграция с другими технологиями.

Начнем с создания простого проекта Spring Boot с интеграцией Kafka, в проекте же настроим необходимые зависимости.

Создаем новый проект Spring Boot в любимой среде IDE, добавляем в файл pom.xml или build.gradle зависимости для Maven или Gradle:

<!-- Для Maven -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
// Для Gradle
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.kafka:spring-kafka'

Прежде чем применять Kafka для событийно-ориентированного взаимодействия, создадим темы и укажем количество разделов, темы подобны каналам для публикации событий отправителями и подписки получателей на эти события:

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaTopicConfiguration {
@Bean
public NewTopic exampleTopic() {
return new NewTopic("example_topic", 1, (short) 1);
}
}

В этом примере классом @Configuration определяется тема Kafka example_topic с одним разделом и одним коэффициентом репликации, которым обеспечивается отказоустойчивость за счет репликации данных в нескольких брокерах.

Теперь продемонстрируем интеграцию Spring Boot с Kafka, создав простой проект с базовыми отправителем и получателем:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

private final KafkaTemplate<String, String> kafkaTemplate;
private final String topic = "example_topic";

@Autowired
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

public void sendMessage(String message) {
kafkaTemplate.send(topic, message);
}
}

Здесь для отправки сообщений-событий в тему Kafka example_topic создан класс KafkaProducerService. Сообщения публикуем с помощью KafkaTemplate из Spring Kafka.

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

@KafkaListener(topics = "example_topic", groupId = "example_group")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
}

Класс KafkaConsumerService, которым прослушивается example_topic, принадлежит группе получателей example_group. Всякий раз, когда сообщение публикуется в теме, для обработки входящего события вызывается метод receiveMessage.

Реальные примеры

Обработка заказов

В реальной системе обработки заказов поток событий такой:

  • Заказ размещен.
  • Платеж обработан.
  • Запасы обновлены.

В этом сценарии каждое событие обрабатывается разными микросервисами, которые для поддержания согласованности и разделения служб взаимодействуют через Kafka.

Этап 1. Настройка среды Kafka

Запускаем требуемый Kafka сервер ZooKeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties

Запускаем сервер Kafka:

bin/kafka-server-start.sh config/server.properties

Создаем две темы Kafka для событий заказа и платежных событий:

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic order_events

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic payment_events

Создаем новый проект Spring Boot в любимой среде IDE, добавляем в файл pom.xml зависимости:

<dependencies>
<!-- Spring Boot Starter Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- Spring Boot Starter для Apache Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>

Для взаимодействия служб создаем два класса с данными о событиях OrderEvent и PaymentEvent:

public class OrderEvent {
private Long orderId;
// Другие поля, конструкторы, геттеры, сеттеры
}

public class PaymentEvent {
private Long orderId;
private BigDecimal amount;
// Другие поля, конструкторы, геттеры, сеттеры
}

Создаем две службы-отправителя Kafka  —  одну для событий заказа, другую для платежных событий:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class OrderEventProducer {

private static final String TOPIC = "order_events";

@Autowired
private KafkaTemplate<String, OrderEvent> kafkaTemplate;

public void sendOrderEvent(OrderEvent orderEvent) {
kafkaTemplate.send(TOPIC, orderEvent);
}
}
@Service
public class PaymentEventProducer {

private static final String TOPIC = "payment_events";

@Autowired
private KafkaTemplate<String, PaymentEvent> kafkaTemplate;

public void sendPaymentEvent(PaymentEvent paymentEvent) {
kafkaTemplate.send(TOPIC, paymentEvent);
}
}

Создаем две службы-получателя Kafka  —  одну для событий заказа, другую для платежных событий:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class OrderEventConsumer {

@KafkaListener(topics = "order_events", groupId = "order_group")
public void handleOrderEvent(OrderEvent orderEvent) {
// Обрабатываем событие заказа, например сохраняем его в базе данных
System.out.println("Received Order Event: " + orderEvent);
// Реализуем логику обработки заказов и обновления запасов
}
}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class PaymentEventConsumer {

@KafkaListener(topics = "payment_events", groupId = "payment_group")
public void handlePaymentEvent(PaymentEvent paymentEvent) {
// Обрабатываем платежное событие, например обновляем статус платежа
System.out.println("Received Payment Event: " + paymentEvent);
// Реализуем логику обработки платежа и обновления статуса заказа
}
}

Чтобы получать данные о заказах и платежах клиентов, создаем конечные точки REST, по получении данных отправляем события в Kafka:

@RestController
public class OrderController {

@Autowired
private OrderEventProducer orderEventProducer;

@PostMapping("/orders")
public ResponseEntity<String> createOrder(@RequestBody OrderEvent orderEvent) {
// Полученное событие заказа обрабатываем и отправляем в Kafka
orderEventProducer.sendOrderEvent(orderEvent);
return ResponseEntity.ok("Order created successfully");
}
}
@RestController
public class PaymentController {

@Autowired
private PaymentEventProducer paymentEventProducer;

@PostMapping("/payments")
public ResponseEntity<String> processPayment(@RequestBody PaymentEvent paymentEvent) {
// Полученное платежное событие обрабатываем и отправляем в Kafka
paymentEventProducer.sendPaymentEvent(paymentEvent);
return ResponseEntity.ok("Payment processed successfully");
}
}

Запускаем приложение Spring Boot и с помощью клиента REST, например Postman, создаем заказы и обрабатываем платежи. Просматриваем в журналах события, полученные службами-получателями, и выполняемую логику обработки заказов и обновления запасов.

Имейте в виду, что это упрощенный пример: в реальном приложении добавляются проверки, обработка ошибок, дополнительные службы для обработки заказов, платежных шлюзов, учета запасов.

Интернет вещей и обработка сигналов датчиков

Создадим приложение для Интернета вещей и обработки сигналов датчиков со Spring Boot и Kafka. Здесь упор делается на сборе сигналов датчиков, их агрегировании и генерировании предупреждений на основе условий. Опять же, пример упрощенный: в реальном сценарии добавляются сложность и обработка данных.

Для взаимодействия служб создаем класс с данными о событии сигналов датчиков SensorDataEvent:

public class SensorDataEvent {
private String sensorId;
private double value;
// Другие поля, конструкторы, геттеры, сеттеры
}

Чтобы отправлять события сигналов датчиков, создаем службу-отправитель Kafka:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class SensorDataEventProducer {

private static final String TOPIC = "sensor_data_events";

@Autowired
private KafkaTemplate<String, SensorDataEvent> kafkaTemplate;

public void sendSensorDataEvent(SensorDataEvent sensorDataEvent) {
kafkaTemplate.send(TOPIC, sensorDataEvent);
}
}

Чтобы получать события сигналов датчиков, создаем службу-получателя Kafka и агрегатор для агрегирования данных:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class SensorDataEventConsumer {

@Autowired
private DataAggregator dataAggregator;

@KafkaListener(topics = "sensor_data_events", groupId = "sensor_data_group")
public void handleSensorDataEvent(SensorDataEvent sensorDataEvent) {
// Полученное событие сигналов датчиков обрабатываем и передаем агрегатору
dataAggregator.aggregate(sensorDataEvent);
}
}
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;

@Component
public class DataAggregator {

private final Map<String, Double> sensorDataMap = new HashMap<>();

public void aggregate(SensorDataEvent sensorDataEvent) {
// Агрегирование данных датчика по его идентификатору
double currentValue = sensorDataMap.getOrDefault(sensorDataEvent.getSensorId(), 0.0);
double aggregatedValue = currentValue + sensorDataEvent.getValue();
sensorDataMap.put(sensorDataEvent.getSensorId(), aggregatedValue);

// Проверяем, превышен ли агрегированным значением определенный порог, и генерируем предупреждение
if (aggregatedValue > 1000) {
generateAlert(sensorDataEvent.getSensorId(), aggregatedValue);
}
}

private void generateAlert(String sensorId, double aggregatedValue) {
System.out.println("ALERT! Sensor ID: " + sensorId + ", Aggregated Value: " + aggregatedValue);
// Реализуем логику генерирования предупреждений, например отправки уведомлений или запуска действий
}
}

Чтобы получать данные датчика с устройств Интернета вещей, создаем конечные точки REST, по получении данных отправляем события в Kafka для дальнейшей обработки и агрегирования:

@RestController
public class SensorDataController {

@Autowired
private SensorDataEventProducer sensorDataEventProducer;

@PostMapping("/sensors/{sensorId}/data")
public ResponseEntity<String> sendSensorData(@PathVariable String sensorId, @RequestBody double value) {
// Создаем и отправляем «SensorDataEvent» в Kafka
SensorDataEvent sensorDataEvent = new SensorDataEvent(sensorId, value);
sensorDataEventProducer.sendSensorDataEvent(sensorDataEvent);
return ResponseEntity.ok("Sensor data received successfully");
}
}

Запускаем Spring Boot и с помощью клиента REST, например Postman, тестируем приложение.

Обработка ошибок и повторные попытки

Обработка ошибок и повторные попытки  —  необходимые компоненты надежной, безотказной программной системы. Они важны для корректного восстановления приложений после неожиданных сбоев, переходных ошибок или отказа внешних служб. Обработка ошибок  —  это процесс устранения неожиданных или исключительных ситуаций, которые случаются при выполнении программы.

Повторные попытки  —  механизм многократного автоматического повторного выполнения неудачной операции или запроса. Их цель  —  обработка переходных ошибок, а они временные и устраняются повторением операции через небольшой интервал. Повторные попытки приходятся кстати при периодических сбоях ненадежных сетевых подключений или внешних служб.

Повторные попытки реализуются разными стратегиями.

Фиксированный повтор: операция повторяется фиксированное количество раз с постоянным интервалом между повторами.

Экспоненциальная задержка: во избежание перегрузки системы, если проблема сохраняется, интервал между повторами увеличивается экспоненциально.

Случайная задержка: чтобы рассредоточить повторные попытки и избежать коллизий запросов, в интервал между повторами вводится элемент случайности.

Выключатель: после определенного количества последовательных сбоев активируется выключатель, и повторные попытки прекращаются. После периода «охлаждения» попытки возобновляются.

Повтор с джиттером: чтобы уменьшить вероятность одновременных попыток нескольких клиентов, фиксированная или экспоненциальная задержка комбинируется с джиттером, случайным интервалом.

Реализуем повторные попытки обработки событий с помощью Spring-Retry. Сначала добавим в pom.xml или build.gradle зависимость Spring-Retry:

<!-- Для Maven -->
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>
// Для Gradle
implementation 'org.springframework.retry:spring-retry'

Чтобы в случае сбоя обработка повторилась, обновляем получателя:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

@Retryable(
value = { Exception.class },
maxAttempts = 3,
backoff = @Backoff(delay = 1000, maxDelay = 3000)
)
@KafkaListener(topics = "example_topic", groupId = "example_group")
public void receiveMessage(String message) {
try {
// Обрабатываем сообщение здесь
System.out.println("Received message: " + message);
} catch (Exception e) {
throw new RuntimeException("Error processing message: " + message, e);
}
}
}

В этом примере в метод receiveMessage добавили аннотацию @Retryable, указав максимум повторов  —  три  —  и интервал между ними  —  1 сек. при максимуме в 3 сек.

Очередь недоставленных сообщений

Это очередь, в которой необработанные события отправляются на дальнейший анализ и отладку. Такая их проверка не сказывается на основном потоке обработки событий.

Чтобы переместить проблемные события в очередь недоставленных сообщений, настраиваем для них в Kafka отдельную тему:

@Configuration
public class KafkaTopicConfiguration {

@Bean
public NewTopic exampleTopic() {
return new NewTopic("example_topic", 1, (short) 1);
}

@Bean
public NewTopic deadLetterTopic() {
return new NewTopic("example_topic_dead_letter", 1, (short) 1);
}
}

Обновляя получателя, отправляем эти события в очередь недоставленных сообщений:

@Service
public class KafkaConsumerService {

@Retryable(
value = { Exception.class },
maxAttempts = 3,
backoff = @Backoff(delay = 1000, maxDelay = 3000)
)
@KafkaListener(topics = "example_topic", groupId = "example_group")
public void receiveMessage(String message) {
try {
// Обрабатываем сообщение здесь
System.out.println("Received message: " + message);
} catch (Exception e) {
// Перемещаем проблемное событие в очередь недоставленных сообщений
kafkaProducerService.sendMessageToDeadLetterTopic(message);
}
}
}

Здесь, чтобы отправить необработанные сообщения в тему example_topic_dead_letter, в KafkaProducerService добавили метод sendMessageToDeadLetterTopic.

Благодаря этим совершенствованиям конвейер обработки событий отказоустойчивее, с временными сбоями он справляется корректнее.

Масштабирование событийно-ориентированных микросервисов

Получатели и отправители в Kafka масштабируются увеличением числа экземпляров соответственных служб. Запуская несколько экземпляров получателей параллельно, мы обрабатываем события одновременно, повышая общую пропускную способность.

При масштабировании получателей важно учитывать разбиение на разделы. Разделы  —  это единица параллелизма Kafka, каждый раздел получается только одним получателем группы получателей. Поэтому для эффективной параллельной обработки должно быть достаточно разделов.

Группа получателей  —  получатели с одним идентификатором группы, ими распределяется нагрузка получения событий из одной и той же темы. Чтобы увеличить параллелизм, получатели в группу добавляются.

Пример

Чтобы масштабировать получателей, для каждого их экземпляра задаем уникальный instanceId и добавляем в группу получателей несколько экземпляров одного получателя:

@Service
public class KafkaConsumerService {

@Value("${kafka.instanceId}")
private String instanceId;

@KafkaListener(
topicPartitions = @TopicPartition(
topic = "example_topic",
partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "0"),
@PartitionOffset(partition = "1", initialOffset = "0"),
// При необходимости добавляем разделы и смещения
}
),
groupId = "example_group"
)
public void receiveMessage(String message) {
System.out.println("[" + instanceId + "] Received message: " + message);
}
}

В этом примере разделы и их начальные смещения для этого экземпляра получателя указаны аннотацией @KafkaListener с topicPartitions. Каждым экземпляром получателя обрабатываются события разных разделов, чем увеличивается параллелизм.

groupId: этим атрибутом указывается группа получателей, к которой относится получатель. Нагрузка получения сообщений из разделов темы распределяется между получателями одной группы.

Чтобы создать несколько экземпляров службы получателей, запускаем приложение с разными значениями instanceId при помощи профилей Spring:

@SpringBootApplication
public class KafkaApplication {

public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, "--spring.profiles.active=instance1");
SpringApplication.run(KafkaApplication.class, "--spring.profiles.active=instance2");
// При необходимости добавляем экземпляры
}
}

Здесь с профилями instance1 и instance2 запущено два экземпляра приложения, у каждого из которых уникальный instanceId. События получаются ими из разных разделов, чем повышается параллелизм.

Количество разделов example_topic увеличиваем в KafkaTopicConfiguration:

@Configuration
public class KafkaTopicConfiguration {

@Bean
public NewTopic exampleTopic() {
return new NewTopic("example_topic", 4, (short) 1);
}

@Bean
public NewTopic deadLetterTopic() {
return new NewTopic("example_topic_dead_letter", 1, (short) 1);
}
}

В этом примере количество разделов example_topic увеличили до четырех, усовершенствовав распределение нагрузки и параллельную обработку.

Запуская несколько экземпляров получателей и увеличивая количество разделов, мы эффективно масштабируем событийно-ориентированные микросервисы для работы с более высокой пропускной способностью.

Заключение

Архитектура событийно-ориентированных микросервисов со Spring Boot и Kafka  —  это мощный, масштабируемый подход к построению современных распределенных систем. Используя преимущества событийно-ориентированного взаимодействия, слабой связанности и асинхронной обработки, разработчики создают гибкие, отказоустойчивые, адаптивные приложения под требования современного динамичного делового ландшафта.

Читайте также:

Читайте нас в Telegram, VK и Дзен


Перевод статьи Bubu Tripathy: Event-Driven Architecture

Предыдущая статьяЧто возвращать в Go: структуры или интерфейсы?
Следующая статьяМикросервисы gRPC в NestJS: пошаговое руководство