Получение одного события разными группами получателей в Kafka с Spring Boot

Рассмотрим, как отправляемое событие получается разными группами получателей в Kafka.

Инструментарий

  • Spring Boot (2.7.5)
  • Apache Kafka
  • Zookeeper
  • Docker

Архитектура

Зависимости

Версия Spring: 2.7.5
Версия Java: 11

<dependencies>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

Docker Compose

version: '3.8'
services:

zookeeper:
container_name: zookeeper-svc
image: confluentinc/cp-zookeeper:5.4.9
restart: always
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181

kafka:
container_name: kafka-svc
image: confluentinc/cp-kafka:6.0.9
restart: always
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_DELETE_TOPIC_ENABLE: "true"
KAFKA_ADVERTISED_HOST_NAME:

kafdrop:
image: obsidiandynamics/kafdrop
container_name: kafdrop-svc
restart: always
depends_on:
- zookeeper
- kafka
ports:
- "9000:9000"
environment:
KAFKA_BROKERCONNECT: kafka:29092

Запускаем следующую команду:

docker-compose up -d

Контейнеры Docker

Типовая модель заказа «Order»

public class Order {

private int orderId;
private Date date;

public Order(int orderId, Date date) {
this.orderId = orderId;
this.date = date;
}

public Order() {
}

// геттер и сеттер
}

Служба заказов отправителя

  • application.properties
spring.application.name=order-application
server.port=8080

spring.kafka.bootstrap-servers=localhost:9092

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false
  • TopicConfig
@Configuration
@Component
public class TopicConfig {

@Bean
public NewTopic createTopic() {
return TopicBuilder.name("order-created-event")
.partitions(1)
.build();
}

}
  • OrderController
@RestController
@RequestMapping("/api/v1/order")
public class OrderController {

public final OrderCreatedEventProducer orderCreatedEventProducer;

@Autowired
public OrderController(OrderCreatedEventProducer orderCreatedEventProducer) {
this.orderCreatedEventProducer = orderCreatedEventProducer;
}

@GetMapping(path = "")
public String publishOrder() {
var id = (int) (Math.random() * 100) + 1;
var order = new Order(id, new Date());
orderCreatedEventProducer.produce(order);

return "Order created";
}
}
  • OrderCreatedEventProducer
@Component
public class OrderCreatedEventProducer {

private KafkaTemplate<String, Object> kafkaTemplate;

@Autowired
public OrderCreatedEventProducer(KafkaTemplate<String, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

public void produce(Order order) {
this.kafkaTemplate.send("order-created-event", order);
}
}

public void produce(Order order): этим методом заказ отправляется в тему Kafka.

this.kafkaTemplate.send("order-created-event", order): внутри метода produce объект Order с помощью kafkaTemplate отправляется в тему order-created-event.

Служба хранения получателя

  • application.properties
spring.application.name=stock-application
server.port=8082

spring.kafka.bootstrap-servers=localhost:9092

spring.kafka.consumer.group-id=stock-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.add.type.headers=false
spring.kafka.consumer.properties.spring.json.trusted.packages=*

spring.kafka.listener.ack-mode=manual_immediate
  • OrderCreatedEventConsumer
@Component
public class OrderCreatedEventConsumer {

@KafkaListener(topics = "order-created-event", groupId = "stock-group", properties = {"spring.json.value.default.type=com.example.stockservice.model.Order"})
public void consumeOrderCreatedEvent(Order order, Acknowledgment acknowledgment) {
System.out.println(String.format("Stock Group, Order ID: %s", order.getOrderId()));
acknowledgment.acknowledge();
}

}

groupId = "stock-group": атрибутом groupId указывается идентификатор группы Kafka, к которой относится этот получатель.

Служба платежей получателя

  • application.properties
spring.application.name=payment-application
server.port=8083

spring.kafka.bootstrap-servers=${kafka-service:localhost}:9092

spring.kafka.consumer.group-id=payment-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.add.type.headers=false
spring.kafka.consumer.properties.spring.json.trusted.packages=*

spring.kafka.listener.ack-mode=manual_immediate
  • OrderCreatedEventConsumer
@Component
public class OrderCreatedEventConsumer {

@KafkaListener(topics = "order-created-event", groupId = "payment-group", properties = {"spring.json.value.default.type=com.example.paymentservice.model.Order"})
public void consumeOrderCreatedEvent(Order order, Acknowledgment acknowledgment) {
System.out.println(String.format("Payment Group, Order ID: %s", order.getOrderId()));
acknowledgment.acknowledge();
}

}

groupId = "payment-group": атрибутом groupId указывается идентификатор группы Kafka, к которой относится этот получатель.

Запуск служб

Служба заказов: запущена
Служба хранения: запущена
Служба платежей: запущена

Заходим на Kafdrop

http://127.0.0.1:9000

Отправляем HTTP-запросы в службу заказов

[GET] http://127.0.0.1:8080/api/v1/order

Консольные журналы

Сервисами хранения и платежей получено одно и то же событие Kafka.

Репозиторий на GitHub.

Читайте также:

Читайте нас в Telegram, VK и Дзен


Перевод статьи Mert ÇAKMAK: Consume the Same Event with Different Consumer Groups in Kafka (Spring Boot)

Предыдущая статьяGo: точечная вставка значения в структуру
Следующая статьяОбработка событий в JavaScript: всплытие, перехват, делегирование и распространение событий