«Я не мечтатель. Я инженер. Я ничего не имею против людей, которые бредут, не разбирая дороги, засматриваясь на звезды, но я смотрю на землю и хочу заделать выбоину, пока в нее не свалился», — Линус Торвальдс
Я считаю, что мы, как специалисты по решению проблем, должны в первую очередь быть настроены на поиск решений поставленных задач, а не ходить вокруг да около в поисках новых проблем, а то и самим создавать их (это другая работа для другого времени).
В чем заключается стоящая перед нами задача?
Если коротко, у нас есть система HTTP-микросервисов, взаимодействующих друг с другом только через централизованный брокер (брокер RabbitMQ). В некоторых сценариях, например в том, который мы обсудим далее, одни сервисы должны ждать ответа от других сервисов, которым они отправили сообщения, и отвечать тому же HTTP-клиенту, который инициировал запрос в том же контексте выполнения.
Документация RabbitMQ по RPC (remote procedure call — вызов удаленных процедур) помогает только в том случае, если нужно, чтобы сервис, инициировавший запрос, получил ответное сообщение в другое время в другом контексте выполнения. К тому же не так уж много ресурсов, позволяющих соединить контекст выполнения потребителя и контекст выполнения, в котором обрабатывался HTTP-запрос.
Почему бы не организовать синхронное взаимодействие между сервисами с помощью HTTP-запросов?
Это было бы простым решением, если бы задача сводилась к тому, чтобы отправить запрос из точки A в точку B, дождаться HTTP-ответа в том же контексте выполнения, а затем ответить клиентам.
Однако это решение делает сервисы связанными между собой: им нужно будет знать адреса друг друга, и это создаст прямые зависимости между ними. Кроме того, сервисы должны знать о существовании друг друга.
Мы же работаем над асинхронным решением, используя тот же централизованный брокер, к которому уже подключены все сервисы.
Что такое шаблон «запрос-ответ»?
«Запрос-ответ» — шаблон обмена сообщениями, в котором запрашивающий посылает сообщение отвечающему и ожидает ответа. Он может быть синхронным и блокирующим, а может быть асинхронным, когда запрашивающий отправляет сообщение в одном потоке, а слушатель ответа ждет в другом потоке. Когда ответ приходит, вызывается слушатель в другом потоке и обрабатывает его, используя обратный вызов, переданный при создании слушателя. Очевидно, что в нашей реализации брокера сообщений мы реализовали асинхронный подход.
Как мы адаптировали стандартную реализацию шаблона «запрос-ответ» к своему сценарию использования
Взаимодействие между двумя сервисами:
- Клиент отправляет обычный HTTP-запрос на сервис A, который представляет собой сервер приложений, предоставляющий клиентам REST API для запроса ресурсов. Этот запрос имеет
correlationId
, который является уникальным идентификатором для различных сервисов.
- Чтобы сервис A получил все ресурсы, которые запросил клиент, ему нужно обратиться к сервису B (выступающему в роли производителя), выполнить там определенную логику и получить результат. Обратите внимание, что сервис A и сервис B подключены к брокеру сообщений с помощью TCP-соединения.
- Сервис B зарегистрирован в брокере, чтобы потреблять сообщения из
RequestQueueB
. - Чтобы сервис A отправил сообщение сервису B, ему нужно послать сообщение брокеру через
RequestQueueB
.
// Сервис A
function getResourcesFromServiceB(messageContent, queueName, options) {
/**
* Предположим, что вы установили соединение с помощью amqplib с брокером rabbitmq,
* а затем сохранили его в переменной amqplibConnectionChannel, используемой ниже.
* Интерфейс канала открывает метод sendToQueue, о котором мы говорили.
**/
return amqplibConnectionChannel.sendToQueue(
queueName,
Buffer.from(messageContent),
options
);
}
app.on("/resources", async (request, response) => {
const resourcesFromServiceB = await getResourcesFromServiceB(
request.body,
"RequestQueueB",
{
correlationId: "clientRequestId1"
}
);
response.json(resourcesFromServiceB);
});
5. Брокер уведомляет сервис, потребляющий запрос из RequestQueueB
(сервис B).
6. Сервис B получает сообщение от брокера, выполняет запрошенную процедуру и теперь готов ответить сервису A. Что теперь? Теперь это сервис B:
// Сервис B /** * Интерфейс канала в amqplib предоставляет метод под названием consume, * который используется для установки слушателя, потребляющего сообщения из очереди. * Он также принимает обработчик обратного вызова для совершения вызовов при получении сообщений. * Пока что этот обработчик будет хранить строку и бездействовать, пока мы не продолжим. */ amqplibConnectionChannel.consume("RequestQueueB", (receivedMessage) => { const resourcesThatServiceANeeds = "Username"; // Что нужно делать сейчас? });
7. Чтобы сервис A мог получить ответ от сервиса B, он должен выступать в роли потребителя. Другими словами, он должен потреблять из какой-то другой очереди, в которой может получать ответы от сервисов, с которыми взаимодействует.
// Сервис A
amqplibConnectionChannel.consume("ReplyQueueA", (receivedReply) => {
/**
* Как получить доступ к этому receivedReply в другом контексте в обработчике метода HTTP (ниже),
* чтобы отправить его обратно клиенту?
*/
});
function getResourcesFromServiceB(messageContent, queueName, options = {}) {
/**
* Предположим, что установили соединение с помощью amqplib с брокером rabbitmq,
* а затем сохранили его в переменной amqplibConnectionChannel, используемой ниже.
* Интерфейс канала предоставляет метод sendToQueue, о котором мы говорили.
**/
return amqplibConnectionChannel.sendToQueue(
queueName,
Buffer.from(messageContent),
options
);
}
app.on("/resources", async (request, response) => {
const resourcesFromServiceB = await getResourcesFromServiceB(
request.body,
"RequestQueueB",
{
correlationId: "clientRequestId1"
}
);
response.json(resourcesFromServiceB);
});
8. Исходя из вышесказанного, сервис A теперь потребляет из ReplyQueueA
. Как сервис B узнает, что он может ответить сервису A через ReplyQueueA
? Сервис A в созданном им сообщении должен отправить свойство, которое включает имя очереди, от которой он ожидает ответа, а в случае сервиса A это ReplyQueueA
. Это свойство в amqplib называется replyTo
, и оно отправляется в объекте options, который сопровождает созданное сообщение.
// Сервис A
const resourcesFromServiceB = await getResourcesFromServiceB(
request.body,
"RequestQueueB",
{
correlationId: "clientRequestId1",
replyTo: "ReplyQueueA"
}
);
9. Сервис B, завершив выполнение всей логики, которую ему нужно было обработать при получении первого сообщения из очереди (RequestQueueB
), переходит к проверке поля replyTo
в сообщении.
10. Сервис B начинает действовать как производитель и посылает брокеру сообщение о том, что ему нужно создать сообщение в ReplyQueueA
(находится в свойстве replyTo
сообщения).
11. Брокер уведомляет сервис, потребляющий сообщение из ReplyQueueA
(сервис А).
// Сервис B amqplibConnectionChannel.consume("RequestQueueB", (receivedMessage) => { const resourcesThatServiceANeeds = "Username"; // Что делать сейчас? amqplibConnectionChannel.sendToQueue( receivedMessage.properties.replyTo, Buffer.from(resourcesThatServiceANeeds); { correlationId: receivedMessage.properties.correlationId } ); });
12. Сервис A получает сообщение от брокера и приступает к ответу клиенту. Но где же клиент?
Помните, как сервис A впервые отправил сообщение сервису B? Фактически это закончилось подтверждением от брокера, что сообщение было помещено в RequestQueueB
, и это было все для данного контекста выполнения.
Затем, когда сервис A получил ответное сообщение от сервиса B, был совершенно другой контекст выполнения внутри метода потребления, используемого при настройке слушателя на эту очередь.
Важный вопрос заключается в следующем:
Как преодолеть разрыв между этими двумя различными контекстами, чтобы получить сообщение от сервиса B, сохранив при этом доступ к постоянно активному HTTP-соединению между сервисом A и клиентом, чтобы ответить клиенту из сервиса A контентом, полученным в качестве ответа от сервиса B?
В событийно-ориентированной среде, такой как Node.js, на помощь приходит модуль events
вместе с промисами!
Возвращаясь к сервису A, нужно добавить три последние детали, чтобы устранить разрыв. Во-первых, при создании сообщения, приходящего от клиента, нужно настроить слушателя событий на запрос correlationId
этого клиента, потому что это уникальный идентификатор.
Во-вторых, чтобы вернуть значение, полученное, когда этот слушатель поймает событие с тем же correlationId
, нужно обернуть его в промис, который можно ожидать в контексте клиента.
В-третьих, на потребителе, получающем данные из ReplyQueueA, нужно вызывать событие каждый раз, когда в качестве ответа приходит сообщение с correlationId
. Значением этого события будет контент полученного ответа. Таким образом, клиентский обработчик, ожидающий разрешения промиса, возвращенного методом getResourcesFromServiceB
, получит доступ к данным из сервиса B и сможет отправить их обратно клиенту в своем контексте выполнения с помощью res.json
.
// Сервис A
const EventEmitter = require("events");
cosnt eventHandler = new EventEmitter();
amqplibConnectionChannel.consume("ReplyQueueA", (receivedReplyMsg) => {
/**
* Теперь этот потребитель будет вызывать событие, названное по
* correlationId, который он получит
*/
eventHandler.emit(
receivedReplyMsg.properties.correlationId,
receivedReplyMsg.content
);
});
function getResourcesFromServiceB(messageContent, queueName, options = {}) {
amqplibConnectionChannel.sendToQueue(
queueName,
Buffer.from(messageContent),
options
);
/**
* Теперь мы вернем промис,
* который будет преобразован в значение, полученное от слушателя
*/
return new Promise((resolve, reject) => {
eventHandler.once(options.correlationId, (dataSentFromEmitter) => {
resolve(dataSentFromEmitter);
/**
* Разумеется, здесь вам придется обработать ошибки;
* можете использовать reject для возврата ошибок внутри
*/
});
})
}
app.on("/resources", async (request, response) => {
const resourcesFromServiceB = await getResourcesFromServiceB(
request.body,
"RequestQueueB",
{
correlationId: "clientRequestId1"
}
);
/*
* Теперь переменная resourcesFromServiceB содержит строку «Username»,
* которой была отправлена * из сервиса B
*/
response.json(resourcesFromServiceB);
});
Чего мы добились?
- Наши сервисы по-прежнему взаимодействуют через свою асинхронную коммуникационную среду (брокер RabbitMQ), при этом шаблон не нарушен.
- Код легко читается в обработчике HTTP-запросов, поскольку теперь нужно только ожидать промиса, возвращаемого методом produce, и получать ответ, как если бы это было возвращаемое значение метода produce.
- Всегда интересно поэкспериментировать с EventListener’ом Node.js.
Полезные ссылки
- Queue Types
- Streams
- AMQP 0–9–1 Model
- AMQP 0–9–1 Client Lib Docs
- Exchanges in RabbitMQ
- Asynchronous message-based communication
- Simple Request-Reply pattern explanation
- Event Sourcing
- Asynchronous Request-Reply pattern
- CQRS Pattern
- Message-based vs Event-based systems
Перечень терминов
Соединение (Connection)
Физическое TCP-соединение, которое необходимо установить между узлом и брокером. AMQP-соединение делится на согласованное количество независимых однонаправленных каналов.
Канал (Channel)
Однонаправленный канал и подразделение TCP-соединения, который используется узлами (производителями, потребителями, очередями) для связи.
Брокер сообщений (Message Broker)
Брокер — это главный сервер, реализующий ядро протокола AMQP и предоставляющий API AMQP, к которому узлы могут подключаться с помощью TCP-соединения. Именно он отвечает за управление очередями, обмен, используемый для маршрутизации сообщений по очередям, хранение и сохранение данных и т. д. Это основная часть протокола AMQP.
Потребитель (Consumer)
Узел, являющийся сервером приложения, который подключается к брокеру с помощью соединения для подписки (потребления сообщений) от узла в брокере (очередь) при использовании канала связи с этой очередью.
Производитель (Producer)
Узел, являющийся сервером приложения, который подключается к брокеру с помощью соединения для производства (публикации сообщений) от узла в брокере (очередь) при использовании канала связи с этой очередью.
Обменники (Exchanges)
Обменники — это сущности AMQP 0-9-1, на которые отправляются сообщения. Обменники принимают сообщение и направляют его в очередь нулевой длины или несколько очередей. Используемый алгоритм маршрутизации зависит от типа обмена и правил, называемых привязками.
Очередь запросов (Request Queue)
Основная очередь, из которой сервис ожидает получения запросов. Другими словами, если сервису нужно запросить какие-либо источники у сервиса изначально, он создает сообщение в своей очереди запросов. Сервис начинает потреблять.
Очередь ответов (Reply Queue)
Очередь, из которой сервис ожидает ответа.
Когда производитель публикует сообщение и если опция waitforResponse равна true, сервис ожидает ответное сообщение в своей очереди ответов.
CorrelationId
В распределенных системах каждое сообщение, циркулирующее между различными системами, обладает идемпотентным идентификатором, который является уникальным модификатором, создаваемым в момент инициирования связи с клиентом, и остается неизменным на протяжении всего жизненного цикла сообщения.
Шаблон «запрос-ответ» (Request-Reply Pattern)
Шаблон обмена сообщениями и протокол, в котором узел посылает сообщение другому узлу, потребляющему это сообщение, и ожидает ответа от этого узла асинхронным образом. Он либо ожидает неопределенное время, пока узел не ответит, либо пока не будет достигнут тайм-аут (при условии определения такового).
RPC (удаленный вызов процедур)
Модель, используемая в шаблоне «запрос-ответ», когда отправитель, или запросчик, посылает сообщение с намерением выполнить процедуру удаленно на ответчике, или получателе сообщения, и ожидает получения результата этой процедуры в качестве ответа.
functionName
Поле, специфичное для нашей реализации, обозначающее имя процедуры, которую производитель хочет выполнить удаленно на потребителе и получить результат в качестве ответа. Мы всегда отправляем его в параметре headers в методе produce().
Читайте также:
- Создание простого клиента RabbitMQ с помощью Go
- .NET Aspire — стремление к рациональному подходу в сфере облачной разработки
- Создание оркестратора для событийно-ориентированного приложения с Golang и RabbitMQ
Читайте нас в Telegram, VK и Дзен
Перевод статьи Basem Mostafa: The Request-reply Pattern in RabbitMQ: An EventListener-based approach in Node.js