Как интегрировать Kafka со Spring Boot

Платформа распределенной потоковой передачи Kafka предоставляет надежную и отказоустойчивую систему обмена сообщениями, позволяющую обрабатывать данные в режиме реального времени. Для создания эффективных, несвязанных и отзывчивых приложений разработчики могут гармонично реализовать возможности Kafka в сочетании с простотой и производительностью фреймворка Spring Boot.

Рассмотрим пошаговую интеграцию Kafka и Spring Boot.

Шаг 1. Настройте Kafka

  • Установите Kafka и запустите кластер Kafka. Инструкции по установке есть в официальной документации Kafka.

Шаг 2. Создайте проект Spring Boot

  • Настройте новый проект Spring Boot, используя предпочитаемую IDE или Spring Initializr. Включите необходимые зависимости:
  1. spring-kafka предоставляет основную функциональность для интеграции Kafka в Spring;
  2. spring-boot-starter-web включает веб-функции в Spring Boot (опционально).

Шаг 3. Настройте свойства Kafka

В файле проекта Spring Boot application.properties настройте необходимые свойства Kafka, такие как серверы начальной загрузки и названия разделов, а также любые дополнительные. Например:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.template.default-topic=my-topic

Шаг 4. Создайте Kafka Producer

  • Внедрите Kafka producer для отправки сообщений в разделы (topic) Kafka. Создать простой producer позволяет KafkaTemplate, предоставленный Spring Kafka. Например:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;

@Autowired
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

public void sendMessage(String message) {
kafkaTemplate.send("my-topic", message);
}
}

Шаг 5. Создайте Kafka Consumer

  • Внедрите Kafka Consumer для получения и обработки сообщений из разделов Kafka. Можно просто использовать аннотацию @KafkaListener, предоставленную Spring Kafka. Например:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumerService {
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void receiveMessage(String message) {
// Process the received message
System.out.println("Received message: " + message);
}
}

Шаг 6. Протестируйте интеграцию

  • Запустите приложение Spring Boot. Для отправки сообщений в Kafka через producer можно создать простую конечную точку REST API. Например:
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaController {
private final KafkaProducerService producerService;

public KafkaController(KafkaProducerService producerService) {
this.producerService = producerService;
}

@PostMapping("/messages")
public void sendMessageToKafka(@RequestBody String message) {
producerService.sendMessage(message);
}
}

Теперь, используя для отправки сообщений в конечную точку /messages такой инструмент, как Postman, можно запустить созданное приложение и наблюдать, как потребитель получает и обрабатывает сообщения из раздела (topic) Kafka. Убедитесь, что сервер Kafka запущен.

Чтобы запустить сервер Kafka, выполните следующие действия.

1. Загрузите Kafka

  • С официального веб-сайта Apache Kafka загрузите последнюю стабильную версию.

2. Извлеките файлы Kafka

  • Распакуйте загруженный архив Kafka в выбранный каталог.

3. Запустите ZooKeeper

  • Kafka использует ZooKeeper для координации управления кластером. Откройте терминал и перейдите в каталог Kafka.
  • Запустите ZooKeeper, выполнив команду:
bin/zookeeper-server-start.sh конфигурация/zookeeper.properties

4. Запустите Kafka Broker

  • Откройте новое окно терминала или вкладку, перейдите в каталог Kafka и запустите Kafka broker, выполнив команду:
bin/kafka-server-start.sh конфигурация/server.properties

По умолчанию Kafka будет использовать файл конфигурации config/server.properties.

5. Верификация сервера Kafka

  • Теперь запущенный сервер Kafka будет прослушивать запросы на localhost:9092 по умолчанию.
  • Создавая разделы или используя другие инструменты командной строки Kafka можно убедиться в том, что сервер Kafka запущен.

После верификации сервер можно использовать в приложениях, чтобы создавать и потреблять сообщения.

Заключение

Интеграция Kafka с Spring Boot обеспечивает производительный и эффективный способ создания надежных и масштабируемых приложений. Комбинируя возможности обмена сообщениями Kafka с простотой фреймворка Spring Boot, разработчики могут создавать распределенные системы, способные обрабатывать большие объемы данных и беспрепятственно обмениваться данными между микросервисами.

Читайте также:

Читайте нас в Telegram, VK и Дзен


Перевод статьи Kishore Karnatakapu: How to Integrate Kafka with Spring Boot

Предыдущая статья23 шаблона проектирования для 99% разработчиков на Java
Следующая статьяОбработка ошибок в TypeScript без try/catch