«Apache Kafka — это распределенная Open Source платформа потоковой передачи событий, используемая в тысячах компаний для высокопроизводительных конвейеров данных, потоковой аналитики, интеграции данных и критически важных приложений». С этим определением сайта я полностью согласен.
Настройка демонстрационной среды
Чтобы не отвлекаться от установки Kafka, мы уже включили в деморепозиторий файл docker-compose и взятые отсюда образы Docker.
Для этого демопроекта, созданного из Spring Initializr, используется Maven.
Клонируем деморепозиторий.
Запускаем docker-compose:
docker-compose up -d
Используя модуль spring-kafka, который с помощью высокоуровневых, абстрактных классов поддерживает потребление и отправку сообщений в Kafka, добавляем в демо такую зависимость:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Подключаемся к Kafka как потребитель событий
На простейшем уровне сообщения из Kafka потребляются всего в два этапа.
Этап 1. Добавление в application.properties
следующей настройки с информацией о сервере начальной загрузки Kafka:
spring.kafka.consumer.bootstrap-servers=localhost:9092
Этап 2. Создание класса потребителя для обработки сообщений:
@Service
public class DemoStringConsumer {
private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(DemoStringConsumer.class);
@KafkaListener(id = "demoGroup", topics = "programmingsharing.topic1")
public void listen(String message) {
log.info("Received: " + message);
}
}
Дополнительные настройки не нужны: приложения Spring Boot готовы к потреблению сообщений из Kafka.
Создаем тему:
docker exec broker \
kafka-topics --bootstrap-server broker:9092 \
--create \
--topic programmingsharing.topic1
Отправляем в нее строковые сообщения:
docker exec --interactive --tty broker \
kafka-console-producer --bootstrap-server broker:9092 \
--topic programmingsharing.topic1
Регистрируем сообщение в журнале:
INFO 26586 --- [demoGroup-0-C-1] c.p.k.consumer.DemoStringConsumer : Received: Hello world
INFO 26586 --- [demoGroup-0-C-1] c.p.k.consumer.DemoStringConsumer : Received: Hi programming sharing readers
Подключаемся к Kafka как источник событий
Аналогично настройке для потребителя.
Этап 1. Добавление в application.properties
настройки с информацией о сервере начальной загрузки источника Kafka:
spring.kafka.producer.bootstrap-servers=localhost:9092
Этап 2. Создание класса для выдачи сообщения с помощью автоматически обнаруживаемого и связываемого компонента KafkaTemplate:
package com.programmingsharing.kafkaspringboot.producer;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.time.Instant;
@Service
public class SimpleScheduledProducer {
private final KafkaTemplate<Object, Object> template;
private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(SimpleScheduledProducer.class);
public SimpleScheduledProducer(KafkaTemplate<Object, Object> template) {
this.template = template;
}
@Scheduled(fixedDelay = 2000)
public void sendFoo() {
log.info("producing message to Kafka, topic=receiving-topic");
this.template.send("receiving-topic", Instant.now().toString());
}
}
Scheduled применяется в демонстрационных целях: каждые две секунды выдается простое значение момента времени в виде строки.
Теперь приложение готово к отправке сообщений на сервер Kafka. Проверяем их, просто добавляя новый KafkaListener
:
@KafkaListener(id = "demoGroup2", topics = "receiving-topic")
public void listenFromReceivingTopic(String message) {
log.info("Received: " + message);
}
В консольном журнале теперь регистрируется информация об источнике и потребителе:
023-01-05T21:24:53.822+07:00 INFO 28552 --- [emoGroup2-0-C-1] c.p.k.consumer.DemoStringConsumer : Received: 2023-01-05T14:24:52.703290Z
2023-01-05T21:24:54.708+07:00 INFO 28552 --- [ scheduling-1] c.p.k.producer.SimpleScheduledProducer : producing message to Kafka, topic=receiving-topic
2023-01-05T21:24:54.735+07:00 INFO 28552 --- [emoGroup2-0-C-1] c.p.k.consumer.DemoStringConsumer : Received: 2023-01-05T14:24:54.709550Z
2023-01-05T21:24:56.713+07:00 INFO 28552 --- [ scheduling-1] c.p.k.producer.SimpleScheduledProducer : producing message to Kafka, topic=receiving-topic
2023-01-05T21:24:56.743+07:00 INFO 28552 --- [emoGroup2-0-C-1] c.p.k.consumer.DemoStringConsumer : Received: 2023-01-05T14:24:56.714425Z
2023-01-05T21:24:58.720+07:00 INFO 28552 --- [ scheduling-1] c.p.k.producer.SimpleScheduledProducer : producing message to Kafka, topic=receiving-topic
2023-01-05T21:24:58.751+07:00 INFO 28552 --- [emoGroup2-0-C-1] c.p.k.consumer.DemoStringConsumer : Received: 2023-01-05T14:24:58.722375Z
2023-01-05T21:25:00.728+07:00 INFO 28552 --- [ scheduling-1] c.p.k.producer.SimpleScheduledProducer : producing message to Kafka, topic=receiving-topic
2023-01-05T21:25:00.750+07:00 INFO 28552 --- [emoGroup2-0-C-1] c.p.k.consumer.DemoStringConsumer : Received: 2023-01-05T14:25:00.729822Z
2023-01-05T21:25:02.736+07:00 INFO 28552 --- [ scheduling-1] c.p.k.producer.SimpleScheduledProducer : producing message to Kafka, topic=receiving-topic
2023-01-05T21:25:02.759+07:00 INFO 28552 --- [emoGroup2-0-C-1] c.p.k.consumer.DemoStringConsumer : Received: 2023-01-05T14:25:02.737265Z
2023-01-05T21:25:04.740+07:00 INFO 28552 --- [ scheduling-1] c.p.k.producer.SimpleScheduledProducer : producing message to Kafka, topic=receiving-topic
2023-01-05T21:25:04.768+07:00 INFO 28552 --- [emoGroup2-0-C-1] c.p.k.consumer.DemoStringConsumer : Received: 2023-01-05T14:25:04.741946Z
Класс KafkaAutoConfiguration
В модуле spring-boot-autoconfigure имеется класс KafkaAutoConfiguration для автоматического создания необходимых компонентов: kafkaTemplate
, kafkaProducerListener
, kafkaConsumerFactory
, kafkaProducerFactory
.
KafkaProperties — это класс ConfigurationProperties со свойствами spring.kafka
, которые мы добавляли выше. Возможности модуля spring-kafka
раскрываются доступными свойствами конфигураций этого класса:
- Потребитель.
- Источник.
- Администратор Kafka.
- Поток Kafka.
- Сервис аутентификации и авторизации Java.
- SSL-аутентификация.
- Повторные попытки.
Читайте также:
- Конвейер данных в реальном времени с Kafka и ClickHouse
- CHRONOS: современный мониторинг работоспособности приложений
- Как создать эффективную систему логирования с использованием Aspect и Spring Cloud Sleuth
Читайте нас в Telegram, VK и Дзен
Перевод статьи Thành Trần: Connecting Kafka in Spring Boot