Рассмотрим, как отправляемое событие получается разными группами получателей в Kafka.
Инструментарий
- Spring Boot (2.7.5)
- Apache Kafka
- Zookeeper
- Docker
Архитектура
Зависимости
Версия Spring: 2.7.5
Версия Java: 11
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Docker Compose
version: '3.8'
services:
zookeeper:
container_name: zookeeper-svc
image: confluentinc/cp-zookeeper:5.4.9
restart: always
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
container_name: kafka-svc
image: confluentinc/cp-kafka:6.0.9
restart: always
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_DELETE_TOPIC_ENABLE: "true"
KAFKA_ADVERTISED_HOST_NAME:
kafdrop:
image: obsidiandynamics/kafdrop
container_name: kafdrop-svc
restart: always
depends_on:
- zookeeper
- kafka
ports:
- "9000:9000"
environment:
KAFKA_BROKERCONNECT: kafka:29092
Запускаем следующую команду:
docker-compose up -d
Контейнеры Docker
Типовая модель заказа «Order»
public class Order {
private int orderId;
private Date date;
public Order(int orderId, Date date) {
this.orderId = orderId;
this.date = date;
}
public Order() {
}
// геттер и сеттер
}
Служба заказов отправителя
- application.properties
spring.application.name=order-application
server.port=8080
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false
- TopicConfig
@Configuration
@Component
public class TopicConfig {
@Bean
public NewTopic createTopic() {
return TopicBuilder.name("order-created-event")
.partitions(1)
.build();
}
}
- OrderController
@RestController
@RequestMapping("/api/v1/order")
public class OrderController {
public final OrderCreatedEventProducer orderCreatedEventProducer;
@Autowired
public OrderController(OrderCreatedEventProducer orderCreatedEventProducer) {
this.orderCreatedEventProducer = orderCreatedEventProducer;
}
@GetMapping(path = "")
public String publishOrder() {
var id = (int) (Math.random() * 100) + 1;
var order = new Order(id, new Date());
orderCreatedEventProducer.produce(order);
return "Order created";
}
}
- OrderCreatedEventProducer
@Component
public class OrderCreatedEventProducer {
private KafkaTemplate<String, Object> kafkaTemplate;
@Autowired
public OrderCreatedEventProducer(KafkaTemplate<String, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void produce(Order order) {
this.kafkaTemplate.send("order-created-event", order);
}
}
public void produce(Order order)
: этим методом заказ отправляется в тему Kafka.
this.kafkaTemplate.send("order-created-event", order)
: внутри метода produce
объект Order
с помощью kafkaTemplate
отправляется в тему order-created-event.
Служба хранения получателя
- application.properties
spring.application.name=stock-application
server.port=8082
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=stock-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.add.type.headers=false
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.listener.ack-mode=manual_immediate
- OrderCreatedEventConsumer
@Component
public class OrderCreatedEventConsumer {
@KafkaListener(topics = "order-created-event", groupId = "stock-group", properties = {"spring.json.value.default.type=com.example.stockservice.model.Order"})
public void consumeOrderCreatedEvent(Order order, Acknowledgment acknowledgment) {
System.out.println(String.format("Stock Group, Order ID: %s", order.getOrderId()));
acknowledgment.acknowledge();
}
}
groupId = "stock-group"
: атрибутом groupId
указывается идентификатор группы Kafka, к которой относится этот получатель.
Служба платежей получателя
- application.properties
spring.application.name=payment-application
server.port=8083
spring.kafka.bootstrap-servers=${kafka-service:localhost}:9092
spring.kafka.consumer.group-id=payment-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.add.type.headers=false
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.listener.ack-mode=manual_immediate
- OrderCreatedEventConsumer
@Component
public class OrderCreatedEventConsumer {
@KafkaListener(topics = "order-created-event", groupId = "payment-group", properties = {"spring.json.value.default.type=com.example.paymentservice.model.Order"})
public void consumeOrderCreatedEvent(Order order, Acknowledgment acknowledgment) {
System.out.println(String.format("Payment Group, Order ID: %s", order.getOrderId()));
acknowledgment.acknowledge();
}
}
groupId = "payment-group"
: атрибутом groupId
указывается идентификатор группы Kafka, к которой относится этот получатель.
Запуск служб
Служба заказов: запущена
Служба хранения: запущена
Служба платежей: запущена
Заходим на Kafdrop
Отправляем HTTP-запросы в службу заказов
Консольные журналы
Сервисами хранения и платежей получено одно и то же событие Kafka.
Репозиторий на GitHub.
Читайте также:
- Упрощаем интеграцию Kafka со Spring Boot
- Секрет производительности Kafka
- Ключевые вопросы для собеседования по Spring Boot в 2023 году. Часть 2
Читайте нас в Telegram, VK и Дзен
Перевод статьи Mert ÇAKMAK: Consume the Same Event with Different Consumer Groups in Kafka (Spring Boot)