Spring Boot, Kafka и WebSocket для отправки сообщений в реальном времени

Spring Boot на основе Spring Framework упрощает загрузку и разработку новых приложений. Kafka  —  платформа распределенной потоковой передачи, позволяющая создавать конвейеры и приложения для передачи данных в реальном времени. WebSocket обеспечивает полнодуплексный канал связи через один сокет  —  идеальный вариант для обмена данными в реальном времени.

Далее рассмотрим процесс настройки приложения Spring Boot, интеграции его с Kafka для постановки сообщений в очередь и с WebSocket для обмена сообщениями. Вы получите четкое представление о взаимодействии этих технологий в приложении с чатом реального времени.

В конце статьи имеются ссылки на репозитории.


Вначале разберемся с настройкой приложения Spring Boot и интеграцией его с Kafka.

Spring Boot  —  мощный фреймворк для разработки и настройки приложений. Он включает функции автонастройки, автономный код и встроенные серверы, упрощающие создание автономных корпоративных приложений Spring.

Сначала нужно настроить приложение Spring Boot. Это можно сделать с помощью инструмента Spring Initializr, обеспечивающего быструю начальную загрузку таких приложений. Для работы с чатом нужно включить зависимости “Web”, “Kafka”, “WebSocket” и “Lombok”.

Spring Initializr
Зависимости

После настройки приложения можно приступать к интеграции Spring Boot и распределенной потоковой платформы Kafka, используемой для создания конвейеров передачи данных в режиме реального времени и потоковых приложений. Это идеальный выбор для живого общения благодаря масштабируемости по горизонтали, надежности и невероятно быстрой работе.

Сначала создадим классы `Message` и `MessageType`, представляющие объект message и типы отправляемых сообщений.

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Message {
private MessageType type;
private String content;
private String sender;
private String sessionId;
}public enum MessageType {
CHAT,
CONNECT,
DISCONNECT
}

Затем создаем класс `KafkaConsumerConfig`:

@Configuration
public class KafkaConsumerConfig {

@Bean
public ConsumerFactory<String, Message> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094");
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "chat");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configProps, new StringDeserializer(),
new JsonDeserializer<>(Message.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Message> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}

Теперь настраиваем в классе `KafkaProducerConfig` производителя (producer) Kafka, отвечающего за отправку сообщений. Используем для этого `KafkaTemplate`. Вот как выглядит класс `KafkaProducerConfig`:

@Configuration
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;

@Bean
public ProducerFactory<String, Message> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Message> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}

Чтобы в реальном времени обмениваться сообщениями между сервером и клиентами, воспользуемся WebSocket.


WebSocket предоставляет полнодуплексный канал связи через один сокет, поэтому идеально подходит для передачи данных реального времени между клиентом и сервером.

В классе `WebSocketConfig` настраиваем брокер для обработки сообщений, отправляемых клиентам WebSocket и получаемые от них. Вот как выглядит класс `WebSocketConfig`:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfigimplementsWebSocketMessageBrokerConfigurer {

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic");
config.setApplicationDestinationPrefixes("/app");
}

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws").setAllowedOrigins("http://localhost:5173/").withSockJS();
}
}

В составе класса:

  • Метод `configureMessageBroker` настраивает брокер сообщений. Он включает простой брокер в памяти и устанавливает префикс для сообщений, которые привязаны к методам с аннотацией @MessageMapping.
  • Метод `registerStompEndpoints` регистрирует конечную точку “/ws”, разрешая резервные опции SockJS для альтернативной транспортировки, если WebSocket недоступен.

В настроенном WebSocket можно создать `MessageController` для обработки отправляемых и принимаемых сообщений. `MessageController` использует аннотацию `@MessageMapping` для сопоставления назначения входящих сообщений с определенными методами.

Сначала создадим класс Sender:

@Service
public class Sender {

private final KafkaTemplate<String, Message> kafkaTemplate;

public Sender(KafkaTemplate<String, Message> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

public void send(String topic, Message message) {
kafkaTemplate.send(topic, message);
}
}

Вот так выглядит класс `MessageController`:

@ControllerpublicclassMessageController {

private final Sender sender;
private final SimpMessageSendingOperations messagingTemplate;
private static final Logger logger = LoggerFactory.getLogger(MessageController.class);

public MessageController(Sender sender, SimpMessageSendingOperations messagingTemplate) {
this.sender = sender;
this.messagingTemplate = messagingTemplate;
}

@MessageMapping("/chat.send-message")
public void sendMessage(@Payload Message chatMessage, SimpMessageHeaderAccessor headerAccessor) {
chatMessage.setSessionId(headerAccessor.getSessionId());
sender.send("messaging", chatMessage);
logger.info("Sending message to /topic/public: " + chatMessage);
messagingTemplate.convertAndSend("/topic/public", chatMessage);
logger.info("Message sent to /topic/public: " + chatMessage);
}

@MessageMapping("/chat.add-user")
@SendTo("/topic/public")
public Message addUser(
@Payload Message chatMessage,
SimpMessageHeaderAccessor headerAccessor
) {
if (headerAccessor.getSessionAttributes() != null) {
headerAccessor.getSessionAttributes().put("username", chatMessage.getSender());
}

return chatMessage;
}
}

В этом классе:

  • Метод SendMessage предназначен для обработки сообщений, отправляемых в место назначения “/chat.send-message”. Он отправляет полученное `Message` по адресу “/topic/public”, транслируя его всем подключенным клиентам WebSocket.

В заключение рассмотрим, как обрабатывать события WebSocket и как использовать сообщения из Kafka.

Обработка событий WebSocket необходима для поддержки состояния приложения. Например, при отключении/подключении пользователя можно передавать сообщение/уведомление об этом остальным пользователям. Spring позволяет обрабатывать эти события, создавая класс прослушивателя событий (event listener).

Соответствующие методы класса `WebSocketEventListener`помечены как `@EventListener`. Они вызываются автоматически в ответ на соответствующее событие. Пример класса `WebSocketEventListener`:

@Component
public class WebSocketEventListener {

private static final Logger logger = LoggerFactory.getLogger(WebSocketEventListener.class);
private final SimpMessageSendingOperations messagingTemplate;

public WebSocketEventListener(SimpMessageSendingOperations messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}

@EventListener
public void handleWebSocketConnectListener(SessionConnectedEvent event) {
logger.info("Received a new web socket connection");
}

@EventListener
public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) {
StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());
String username = (String) headerAccessor.getSessionAttributes().get("username");

if (username != null) {
logger.info("User Disconnected: " + username);
Message chatMessage = new Message();
chatMessage.setType(MessageType.DISCONNECT);
chatMessage.setSender(username);
messagingTemplate.convertAndSend("/topic/public", chatMessage);
}
}
}

В этом классе:

  • Метод handleWebSocketConnectListener вызывается при открытии нового соединения WebSocket.
  • Метод handleWebSocketDisconnectListener вызывается при закрытии соединения WebSocket. Он отправляет сообщение ‘DISCONNECT’ всем остальным подключенным клиентам WebSocket.

Наконец, нужно использовать сообщения от Kafka. В классе Receiver имеется метод с аннотацией @KafkaListener. Он вызывается автоматически при получении сообщения из топика Kafka. Вот как выглядит этот класс `Receiver`:

@Service
public class Receiver {

private static final Logger logger = LoggerFactory.getLogger(Receiver.class);
private final SimpMessageSendingOperations messagingTemplate;
private final SimpUserRegistry userRegistry;

public Receiver(SimpMessageSendingOperations messagingTemplate, SimpUserRegistry userRegistry) {
this.messagingTemplate = messagingTemplate;
this.userRegistry = userRegistry;
}

@KafkaListener(topics = "messaging", groupId = "chat")
public void consume(Message chatMessage) {
logger.info("Received message from Kafka: " + chatMessage);
for (SimpUser user : userRegistry.getUsers()) {
for (SimpSession session : user.getSessions()) {
if (!session.getId().equals(chatMessage.getSessionId())) {
messagingTemplate.convertAndSendToUser(session.getId(), "/topic/public", chatMessage);
}
}
}
}
}

В этом классе метод `consume` вызывается при получении сообщения из раздела (топика) Kafka “messaging”. Он отправляет полученное `сообщение` в “/topic/public” (раздел WebSocket), транслируя его всем подключенным к WebSocket клиентам.


Фронтенд: обмен сообщениями реального времени через WebSocket с помощью React и STOMP

Теперь создадим с помощью React фронтенд-приложение для взаимодействия с бэкенд-сервисом. Для установки протокола STOMP поверх подключения WebSocket используем библиотеку @stomp/stompjs.

Сначала создадим проект, используя IDE WebStorm.

Затем устанавливаем необходимые библиотеки:npm install @stomp/stompjs sockjs-client @mui/material react-avatar

Здесь @stomp/stompjs используется для клиента STOMP, клиент sockjs используется для подключения к WebSocket, @mui/material используется для компонентов пользовательского интерфейса, а react-avatar нужен для отображения аватаров пользователей.

Создание Username Page

В `UsernamePage.jsx` обычно создают форму, где пользователь вводит свое имя. После отправки имени пользователя его можно сохранить в состоянии и передать в компонент `ChatPage` в качестве пропа. Затем это имя пользователя будет использоваться как отправитель сообщений.

import { useState } from 'react';
import { Button, TextField, Container, Box } from '@mui/material';
import PropTypes from 'prop-types';

function UsernamePage({ setUsername }) {
UsernamePage.propTypes = {
setUsername: PropTypes.func.isRequired,
};

const [inputUsername, setInputUsername] = useState('');

const handleUsernameSubmit = (event) => {
event.preventDefault();
if (inputUsername) {
setUsername(inputUsername);
}
};

return (
<Container>
<Box display="flex" flexDirection="column" justifyContent="center" alignItems="center" mt={2}>
<h1>Type your username</h1>
<form onSubmit={handleUsernameSubmit}>
<Box display="flex" alignItems="stretch">
<TextField
sx={{
color: 'white', '& .MuiOutlinedInput-notchedOutline': { borderColor: 'gray' },
width: '300px',
'& .MuiOutlinedInput-root': {
borderRadius: '36px',
'& fieldset': {
borderColor: 'gray',
},
'& input': {
height: '8px',
},
},
}}
inputProps={{ style: { color: 'white' } }}
variant="outlined"
placeholder="Username"
value={inputUsername}
onChange={(e) => setInputUsername(e.target.value)}
/>
<Box marginLeft={2}>
<Button
variant="contained"
sx={{
width: '94px',
height: '42px',
borderRadius: '36px',
}}
color="primary"
type="submit">
Enter
</Button>
</Box>
</Box>
</form>
</Box>
</Container>
);
}

export default UsernamePage;

В этом коде UsernamePage принимает проп setUsername, который является функцией для обновления имени пользователя в родительском компоненте (App.jsx). Он содержит форму с текстовым полем для имени пользователя и кнопку отправки. Когда форма отправлена, она вызывает функцию `setUsername` с введенным именем пользователя.

В `App.jsx` обычно сохраняют состояние имени пользователя для условного рендеринга либо `UsernamePage`, либо `ChatPage` в зависимости от того, было ли задано имя пользователя.

import { useState } from 'react';
import UsernamePage from './component/UsernamePage.jsx';
import ChatPage from './component/ChatPage.jsx';

function App() {
const [username, setUsername] = useState(null);

return (
<div>
{username ? <ChatPage username={username} /> : <UsernamePage setUsername={setUsername} />}
</div>
);
}

export default App;

В этом коде приложение хранит и поддерживает состояние имени пользователя. Если это имя равно null, оно отображается (рендерится) как `UsernamePage` и передает функцию `setUsername` в качестве пропа. Если имя пользователя не равно null, оно передается в качестве пропа и отображается `ChatPage`.

Создание Chat Page

Создадим в файле “ChatPage.jsx` компонент “ChatPage` . Он будет обрабатывать подключение к WebSocket, отправлять и получать сообщения и рендерить интерфейс чата.

import { useState, useEffect, useRef } from 'react';
import { Client } from '@stomp/stompjs';
import SockJS from 'sockjs-client/dist/sockjs';
import ChatMessage from "./ChatMessage.jsx";
import { Button, TextField, Container, Box } from '@mui/material';

function ChatPage({ username }) {
const [messages, setMessages] = useState([]);
const [client, setClient] = useState(null);
const messageInputRef = useRef();

useEffect(() => {
const newClient = new Client({
webSocketFactory: () => new SockJS('http://localhost:8080/ws'),
onConnect: () => {
const joinMessage = {
sender: username,
type: 'CONNECT',
};
newClient.publish({ destination: '/app/chat.addUser', body: JSON.stringify(joinMessage) });
newClient.subscribe('/topic/public', message => {
const newMessage = JSON.parse(message.body);
setMessages(prevMessages => [...prevMessages, newMessage]);
});
},
onDisconnect: () => {
if (newClient.connected) {
const leaveMessage = {
sender: username,
type: 'DISCONNECT',
};
newClient.publish({ destination: '/app/chat.addUser', body: JSON.stringify(leaveMessage) });
}
},
});

newClient.activate();
setClient(newClient);

return () => {
newClient.deactivate();
};
}, [username]);

const sendMessage = () => {
if (messageInputRef.current.value && client) {
const chatMessage = {
sender: username,
content: messageInputRef.current.value,
type: 'CHAT',
};
client.publish({ destination: '/app/chat.sendMessage', body: JSON.stringify(chatMessage) });
messageInputRef.current.value = '';
}
};

return (
<Container>
<Box>
{messages.map((message, index) => (
<ChatMessage key={index} message={message} username={username} />
))}
</Box>
<form onSubmit={sendMessage}>
<TextField inputRef={messageInputRef} placeholder="Type a message..." />
<Button type="submit">Send</Button>
</form>
</Container>
);
}

export default ChatPage;

В этом коде используются хуки useState и useEffect из React для управления состоянием и побочными эффектами компонента. Класс Client из @stomp/stompjs используется для создания клиента STOMP через подключение к WebSocket. Метод publish используется для отправки сообщений на сервер, а метод subscribe для получения сообщений с сервера.

Создание компонента Chat Message

В файле `ChatMessage.jsx` создадим компонент ChatMessage. Он будет отображать одно сообщение чата.

import Avatar from 'react-avatar';
import { Box } from '@mui/material';

function ChatMessage({ message, username }) {
return (
<Box sx={{ display: 'flex', flexDirection: 'column', alignItems: message.sender === username ? 'flex-end' : 'flex-start', margin: '10px 0' }}>
<Box sx={{ display: 'flex', flexDirection: message.sender === username ? 'row-reverse' : 'row', alignItems: 'center', gap: 1 }}>
<Avatar name={message.sender} size="35" round={true} />
<h4>{message.sender}</h4>
</Box>
<Box sx={{
backgroundColor: message.sender === username ? 'primary.main' : 'secondary.main',
color: 'white',
borderRadius: '12px',
padding: '10px',
maxWidth: '80%',
}}>
<p>{message.content}</p>
</Box>
</Box>
);
}

export default ChatMessage;

В этом коде используется компонент Box из @mui/material для создания гибкого макета и компонент Avatar из react-avatar для отображения аватара пользователя. Проп sx используется для применения стилей к компонентам.

В этом разделе мы создали с использованием React фронтенд-приложение, которое взаимодействует с бэкенд-сервисом через соединение WebSocket. При этом использована библиотека @stomp/stompjs для определения протокола STOMP через подключение к WebSocket и библиотека @mui/material для создания пользовательского интерфейса.


Попробуем сделать это с командной строкой, создав класс `CommandController`:

@RestController
public class CommandController {

private final KafkaTemplate<String, Message> kafkaTemplate;
private final SimpMessageSendingOperations messagingTemplate;

public CommandController(KafkaTemplate<String, Message> kafkaTemplate, SimpMessageSendingOperations messagingTemplate) {
this.kafkaTemplate = kafkaTemplate;
this.messagingTemplate = messagingTemplate;
}

@PostMapping("/send")
public void send(@RequestBody Message message) {
kafkaTemplate.send("messaging", message);
messagingTemplate.convertAndSend("/topic/public", message);
}
}

Выполняем эти команды в терминале:

curl -X POST -H "Content-Type: application/json" -d "{\"type\":\"CONNECT\",\"content\":\"Hello, World!\",\"sender\":\"Command Line\"}" http://localhost:8080/send

curl -X POST -H "Content-Type: application/json" -d "{\"type\":\"CHAT\",\"content\":\"Hello, World!\",\"sender\":\"Command Line\"}" http://localhost:8080/send

Получим визуальный результат:


Используя Spring Boot, Kafka и WebSocket мы создали масштабируемое и эффективное приложение для чата, работающее в реальном времени.

Репозитории на GitHub:

Читайте также:

Читайте нас в Telegram, VK и Дзен


Перевод статьи Muhammad Umar Al Fajar: Spring Boot, Kafka, and WebSocket: A Practical Approach to Real-Time Messaging

Предыдущая статьяPurePWA — радикальный поворот в веб-разработке
Следующая статьяСоздание computedAsync для вычисления значений сигналов в Angular