Подключение Kafka в Spring Boot

«Apache Kafka  —  это распределенная Open Source платформа потоковой передачи событий, используемая в тысячах компаний для высокопроизводительных конвейеров данных, потоковой аналитики, интеграции данных и критически важных приложений». С этим определением сайта я полностью согласен.

Настройка демонстрационной среды

Чтобы не отвлекаться от установки Kafka, мы уже включили в деморепозиторий файл docker-compose и взятые отсюда образы Docker.

Для этого демопроекта, созданного из Spring Initializr, используется Maven.

Клонируем деморепозиторий.

Запускаем docker-compose:

docker-compose up -d

Используя модуль spring-kafka, который с помощью высокоуровневых, абстрактных классов поддерживает потребление и отправку сообщений в Kafka, добавляем в демо такую зависимость:

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

Подключаемся к Kafka как потребитель событий

На простейшем уровне сообщения из Kafka потребляются всего в два этапа.

Этап 1. Добавление в application.properties следующей настройки с информацией о сервере начальной загрузки Kafka:

spring.kafka.consumer.bootstrap-servers=localhost:9092

Этап 2. Создание класса потребителя для обработки сообщений:

@Service
public class DemoStringConsumer {
private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(DemoStringConsumer.class);

@KafkaListener(id = "demoGroup", topics = "programmingsharing.topic1")
public void listen(String message) {
log.info("Received: " + message);
}
}

Дополнительные настройки не нужны: приложения Spring Boot готовы к потреблению сообщений из Kafka.

Создаем тему:

docker exec broker \
kafka-topics --bootstrap-server broker:9092 \
--create \
--topic programmingsharing.topic1

Отправляем в нее строковые сообщения:

docker exec --interactive --tty broker \
kafka-console-producer --bootstrap-server broker:9092 \
--topic programmingsharing.topic1

Регистрируем сообщение в журнале:

INFO 26586 --- [demoGroup-0-C-1] c.p.k.consumer.DemoStringConsumer        : Received: Hello world
INFO 26586 --- [demoGroup-0-C-1] c.p.k.consumer.DemoStringConsumer : Received: Hi programming sharing readers

Подключаемся к Kafka как источник событий

Аналогично настройке для потребителя.

Этап 1. Добавление в application.properties настройки с информацией о сервере начальной загрузки источника Kafka:

spring.kafka.producer.bootstrap-servers=localhost:9092

Этап 2. Создание класса для выдачи сообщения с помощью автоматически обнаруживаемого и связываемого компонента KafkaTemplate:

package com.programmingsharing.kafkaspringboot.producer;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.time.Instant;

@Service
public class SimpleScheduledProducer {
private final KafkaTemplate<Object, Object> template;

private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(SimpleScheduledProducer.class);

public SimpleScheduledProducer(KafkaTemplate<Object, Object> template) {
this.template = template;
}

@Scheduled(fixedDelay = 2000)
public void sendFoo() {
log.info("producing message to Kafka, topic=receiving-topic");
this.template.send("receiving-topic", Instant.now().toString());
}
}

Scheduled применяется в демонстрационных целях: каждые две секунды выдается простое значение момента времени в виде строки.

Теперь приложение готово к отправке сообщений на сервер Kafka. Проверяем их, просто добавляя новый KafkaListener:

@KafkaListener(id = "demoGroup2", topics = "receiving-topic")
public void listenFromReceivingTopic(String message) {
log.info("Received: " + message);
}

В консольном журнале теперь регистрируется информация об источнике и потребителе:

023-01-05T21:24:53.822+07:00  INFO 28552 --- [emoGroup2-0-C-1] c.p.k.consumer.DemoStringConsumer        : Received: 2023-01-05T14:24:52.703290Z
2023-01-05T21:24:54.708+07:00 INFO 28552 --- [ scheduling-1] c.p.k.producer.SimpleScheduledProducer : producing message to Kafka, topic=receiving-topic
2023-01-05T21:24:54.735+07:00 INFO 28552 --- [emoGroup2-0-C-1] c.p.k.consumer.DemoStringConsumer : Received: 2023-01-05T14:24:54.709550Z
2023-01-05T21:24:56.713+07:00 INFO 28552 --- [ scheduling-1] c.p.k.producer.SimpleScheduledProducer : producing message to Kafka, topic=receiving-topic
2023-01-05T21:24:56.743+07:00 INFO 28552 --- [emoGroup2-0-C-1] c.p.k.consumer.DemoStringConsumer : Received: 2023-01-05T14:24:56.714425Z
2023-01-05T21:24:58.720+07:00 INFO 28552 --- [ scheduling-1] c.p.k.producer.SimpleScheduledProducer : producing message to Kafka, topic=receiving-topic
2023-01-05T21:24:58.751+07:00 INFO 28552 --- [emoGroup2-0-C-1] c.p.k.consumer.DemoStringConsumer : Received: 2023-01-05T14:24:58.722375Z
2023-01-05T21:25:00.728+07:00 INFO 28552 --- [ scheduling-1] c.p.k.producer.SimpleScheduledProducer : producing message to Kafka, topic=receiving-topic
2023-01-05T21:25:00.750+07:00 INFO 28552 --- [emoGroup2-0-C-1] c.p.k.consumer.DemoStringConsumer : Received: 2023-01-05T14:25:00.729822Z
2023-01-05T21:25:02.736+07:00 INFO 28552 --- [ scheduling-1] c.p.k.producer.SimpleScheduledProducer : producing message to Kafka, topic=receiving-topic
2023-01-05T21:25:02.759+07:00 INFO 28552 --- [emoGroup2-0-C-1] c.p.k.consumer.DemoStringConsumer : Received: 2023-01-05T14:25:02.737265Z
2023-01-05T21:25:04.740+07:00 INFO 28552 --- [ scheduling-1] c.p.k.producer.SimpleScheduledProducer : producing message to Kafka, topic=receiving-topic
2023-01-05T21:25:04.768+07:00 INFO 28552 --- [emoGroup2-0-C-1] c.p.k.consumer.DemoStringConsumer : Received: 2023-01-05T14:25:04.741946Z

Класс KafkaAutoConfiguration

В модуле spring-boot-autoconfigure имеется класс KafkaAutoConfiguration для автоматического создания необходимых компонентов: kafkaTemplate, kafkaProducerListener, kafkaConsumerFactory, kafkaProducerFactory.

KafkaProperties  —  это класс ConfigurationProperties со свойствами spring.kafka, которые мы добавляли выше. Возможности модуля spring-kafka раскрываются доступными свойствами конфигураций этого класса:

  • Потребитель.
  • Источник.
  • Администратор Kafka.
  • Поток Kafka.
  • Сервис аутентификации и авторизации Java.
  • SSL-аутентификация.
  • Повторные попытки.

Читайте также:

Читайте нас в TelegramVK и Дзен


Перевод статьи Thành Trần: Connecting Kafka in Spring Boot

Предыдущая статьяКак перенести сайт с WordPress на GitHub Pages
Следующая статья11 ключевых принципов эргономики в UI-дизайне