«Я не мечтатель. Я инженер. Я ничего не имею против людей, которые бредут, не разбирая дороги, засматриваясь на звезды, но я смотрю на землю и хочу заделать выбоину, пока в нее не свалился», — Линус Торвальдс

Я считаю, что мы, как специалисты по решению проблем, должны в первую очередь быть настроены на поиск решений поставленных задач, а не ходить вокруг да около в поисках новых проблем, а то и самим создавать их (это другая работа для другого времени).

В чем заключается стоящая перед нами задача?

Если коротко, у нас есть система HTTP-микросервисов, взаимодействующих друг с другом только через централизованный брокер (брокер RabbitMQ). В некоторых сценариях, например в том, который мы обсудим далее, одни сервисы должны ждать ответа от других сервисов, которым они отправили сообщения, и отвечать тому же HTTP-клиенту, который инициировал запрос в том же контексте выполнения.

Документация RabbitMQ по RPC (remote procedure call — вызов удаленных процедур) помогает только в том случае, если нужно, чтобы сервис, инициировавший запрос, получил ответное сообщение в другое время в другом контексте выполнения. К тому же не так уж много ресурсов, позволяющих соединить контекст выполнения потребителя и контекст выполнения, в котором обрабатывался HTTP-запрос.

Почему бы не организовать синхронное взаимодействие между сервисами с помощью HTTP-запросов?

Это было бы простым решением, если бы задача сводилась к тому, чтобы отправить запрос из точки A в точку B, дождаться HTTP-ответа в том же контексте выполнения, а затем ответить клиентам.

Однако это решение делает сервисы связанными между собой: им нужно будет знать адреса друг друга, и это создаст прямые зависимости между ними. Кроме того, сервисы должны знать о существовании друг друга.

Мы же работаем над асинхронным решением, используя тот же централизованный брокер, к которому уже подключены все сервисы.

Что такое шаблон «запрос-ответ»?

«Запрос-ответ» — шаблон обмена сообщениями, в котором запрашивающий посылает сообщение отвечающему и ожидает ответа. Он может быть синхронным и блокирующим, а может быть асинхронным, когда запрашивающий отправляет сообщение в одном потоке, а слушатель ответа ждет в другом потоке. Когда ответ приходит, вызывается слушатель в другом потоке и обрабатывает его, используя обратный вызов, переданный при создании слушателя. Очевидно, что в нашей реализации брокера сообщений мы реализовали асинхронный подход.

Как мы адаптировали стандартную реализацию шаблона «запрос-ответ» к своему сценарию использования

Взаимодействие между двумя сервисами:

Сервис A получает HTTP-запрос, затем создает сообщение в очереди, которую потребляет сервис B, чтобы запросить некоторые данные
  1. Клиент отправляет обычный HTTP-запрос на сервис A, который представляет собой сервер приложений, предоставляющий клиентам REST API для запроса ресурсов. Этот запрос имеет correlationId, который является уникальным идентификатором для различных сервисов.
  1. Чтобы сервис A получил все ресурсы, которые запросил клиент, ему нужно обратиться к сервису B (выступающему в роли производителя), выполнить там определенную логику и получить результат. Обратите внимание, что сервис A и сервис B подключены к брокеру сообщений с помощью TCP-соединения.
  2. Сервис B зарегистрирован в брокере, чтобы потреблять сообщения из RequestQueueB.
  3. Чтобы сервис A отправил сообщение сервису B, ему нужно послать сообщение брокеру через RequestQueueB.
// Сервис A

function getResourcesFromServiceB(messageContent, queueName, options) {
/**
* Предположим, что вы установили соединение с помощью amqplib с брокером rabbitmq,
* а затем сохранили его в переменной amqplibConnectionChannel, используемой ниже.
* Интерфейс канала открывает метод sendToQueue, о котором мы говорили.
**/
return amqplibConnectionChannel.sendToQueue(
queueName,
Buffer.from(messageContent),
options
);
}

app.on("/resources", async (request, response) => {
const resourcesFromServiceB = await getResourcesFromServiceB(
request.body,
"RequestQueueB",
{
correlationId: "clientRequestId1"
}
);

response.json(resourcesFromServiceB);
});

5. Брокер уведомляет сервис, потребляющий запрос из RequestQueueB (сервис B).

6. Сервис B получает сообщение от брокера, выполняет запрошенную процедуру и теперь готов ответить сервису A. Что теперь? Теперь это сервис B:

        // Сервис B

        /**
         * Интерфейс канала в amqplib предоставляет метод под названием consume,
         * который используется для установки слушателя, потребляющего сообщения из очереди.
         * Он также принимает обработчик обратного вызова для совершения вызовов при получении сообщений.
         * Пока что этот обработчик будет хранить строку и бездействовать, пока мы не продолжим.
         */
        amqplibConnectionChannel.consume("RequestQueueB", (receivedMessage) => {
            const resourcesThatServiceANeeds = "Username";
            // Что нужно делать сейчас?
        });

7. Чтобы сервис A мог получить ответ от сервиса B, он должен выступать в роли потребителя. Другими словами, он должен потреблять из какой-то другой очереди, в которой может получать ответы от сервисов, с которыми взаимодействует.

 // Сервис A

amqplibConnectionChannel.consume("ReplyQueueA", (receivedReply) => {
/**
* Как получить доступ к этому receivedReply в другом контексте в обработчике метода HTTP (ниже),
* чтобы отправить его обратно клиенту?
*/
});

function getResourcesFromServiceB(messageContent, queueName, options = {}) {
/**
* Предположим, что установили соединение с помощью amqplib с брокером rabbitmq,
* а затем сохранили его в переменной amqplibConnectionChannel, используемой ниже.
* Интерфейс канала предоставляет метод sendToQueue, о котором мы говорили.
**/
return amqplibConnectionChannel.sendToQueue(
queueName,
Buffer.from(messageContent),
options
);
}

app.on("/resources", async (request, response) => {
const resourcesFromServiceB = await getResourcesFromServiceB(
request.body,
"RequestQueueB",
{
correlationId: "clientRequestId1"
}
);

response.json(resourcesFromServiceB);
});

8. Исходя из вышесказанного, сервис A теперь потребляет из ReplyQueueA. Как сервис B узнает, что он может ответить сервису A через ReplyQueueA? Сервис A в созданном им сообщении должен отправить свойство, которое включает имя очереди, от которой он ожидает ответа, а в случае сервиса A это ReplyQueueA. Это свойство в amqplib называется replyTo, и оно отправляется в объекте options, который сопровождает созданное сообщение.

        // Сервис A

const resourcesFromServiceB = await getResourcesFromServiceB(
request.body,
"RequestQueueB",
{
correlationId: "clientRequestId1",
replyTo: "ReplyQueueA"
}
);

9. Сервис B, завершив выполнение всей логики, которую ему нужно было обработать при получении первого сообщения из очереди (RequestQueueB), переходит к проверке поля replyTo в сообщении.

10. Сервис B начинает действовать как производитель и посылает брокеру сообщение о том, что ему нужно создать сообщение в ReplyQueueA (находится в свойстве replyTo сообщения).

11. Брокер уведомляет сервис, потребляющий сообщение из ReplyQueueA (сервис А).

        // Сервис B

        amqplibConnectionChannel.consume("RequestQueueB", (receivedMessage) => {
            const resourcesThatServiceANeeds = "Username";
            // Что делать сейчас?
            amqplibConnectionChannel.sendToQueue(
                receivedMessage.properties.replyTo,
                Buffer.from(resourcesThatServiceANeeds);
                {
                    correlationId: receivedMessage.properties.correlationId
                }
            );
        });

12. Сервис A получает сообщение от брокера и приступает к ответу клиенту. Но где же клиент?

Помните, как сервис A впервые отправил сообщение сервису B? Фактически это закончилось подтверждением от брокера, что сообщение было помещено в RequestQueueB, и это было все для данного контекста выполнения.

Затем, когда сервис A получил ответное сообщение от сервиса B, был совершенно другой контекст выполнения внутри метода потребления, используемого при настройке слушателя на эту очередь.

Важный вопрос заключается в следующем:

Как преодолеть разрыв между этими двумя различными контекстами, чтобы получить сообщение от сервиса B, сохранив при этом доступ к постоянно активному HTTP-соединению между сервисом A и клиентом, чтобы ответить клиенту из сервиса A контентом, полученным в качестве ответа от сервиса B?

В событийно-ориентированной среде, такой как Node.js, на помощь приходит модуль events вместе с промисами!

Возвращаясь к сервису A, нужно добавить три последние детали, чтобы устранить разрыв. Во-первых, при создании сообщения, приходящего от клиента, нужно настроить слушателя событий на запрос correlationId этого клиента, потому что это уникальный идентификатор.

Во-вторых, чтобы вернуть значение, полученное, когда этот слушатель поймает событие с тем же correlationId, нужно обернуть его в промис, который можно ожидать в контексте клиента.

В-третьих, на потребителе, получающем данные из ReplyQueueA, нужно вызывать событие каждый раз, когда в качестве ответа приходит сообщение с correlationId. Значением этого события будет контент полученного ответа. Таким образом, клиентский обработчик, ожидающий разрешения промиса, возвращенного методом getResourcesFromServiceB, получит доступ к данным из сервиса B и сможет отправить их обратно клиенту в своем контексте выполнения с помощью res.json.

// Сервис A

const EventEmitter = require("events");
cosnt eventHandler = new EventEmitter();

amqplibConnectionChannel.consume("ReplyQueueA", (receivedReplyMsg) => {
/**
* Теперь этот потребитель будет вызывать событие, названное по
* correlationId, который он получит
*/
eventHandler.emit(
receivedReplyMsg.properties.correlationId,
receivedReplyMsg.content
);
});

function getResourcesFromServiceB(messageContent, queueName, options = {}) {
amqplibConnectionChannel.sendToQueue(
queueName,
Buffer.from(messageContent),
options
);
/**
* Теперь мы вернем промис,
* который будет преобразован в значение, полученное от слушателя
*/
return new Promise((resolve, reject) => {
eventHandler.once(options.correlationId, (dataSentFromEmitter) => {
resolve(dataSentFromEmitter);
/**
* Разумеется, здесь вам придется обработать ошибки;
* можете использовать reject для возврата ошибок внутри
*/
});
})
}

app.on("/resources", async (request, response) => {
const resourcesFromServiceB = await getResourcesFromServiceB(
request.body,
"RequestQueueB",
{
correlationId: "clientRequestId1"
}
);
/*
* Теперь переменная resourcesFromServiceB содержит строку «Username»,
* которой была отправлена * из сервиса B
*/
response.json(resourcesFromServiceB);
});

Чего мы добились?

  1. Наши сервисы по-прежнему взаимодействуют через свою асинхронную коммуникационную среду (брокер RabbitMQ), при этом шаблон не нарушен.
  1. Код легко читается в обработчике HTTP-запросов, поскольку теперь нужно только ожидать промиса, возвращаемого методом produce, и получать ответ, как если бы это было возвращаемое значение метода produce.
  1. Всегда интересно поэкспериментировать с EventListener’ом Node.js.

Полезные ссылки

  1. Queue Types
  2. Streams
  3. AMQP 0–9–1 Model
  4. AMQP 0–9–1 Client Lib Docs
  5. Exchanges in RabbitMQ
  6. Asynchronous message-based communication
  7. Simple Request-Reply pattern explanation
  8. Event Sourcing
  9. Asynchronous Request-Reply pattern
  10. CQRS Pattern
  11. Message-based vs Event-based systems

Перечень терминов

Соединение (Connection)
Физическое TCP-соединение, которое необходимо установить между узлом и брокером. AMQP-соединение делится на согласованное количество независимых однонаправленных каналов.

Канал (Channel)
Однонаправленный канал и подразделение TCP-соединения, который используется узлами (производителями, потребителями, очередями) для связи.

Брокер сообщений (Message Broker)
Брокер — это главный сервер, реализующий ядро протокола AMQP и предоставляющий API AMQP, к которому узлы могут подключаться с помощью TCP-соединения. Именно он отвечает за управление очередями, обмен, используемый для маршрутизации сообщений по очередям, хранение и сохранение данных и т. д. Это основная часть протокола AMQP.

Потребитель (Consumer)
Узел, являющийся сервером приложения, который подключается к брокеру с помощью соединения для подписки (потребления сообщений) от узла в брокере (очередь) при использовании канала связи с этой очередью.

Производитель (Producer)
Узел, являющийся сервером приложения, который подключается к брокеру с помощью соединения для производства (публикации сообщений) от узла в брокере (очередь) при использовании канала связи с этой очередью.

Обменники (Exchanges)
Обменники — это сущности AMQP 0-9-1, на которые отправляются сообщения. Обменники принимают сообщение и направляют его в очередь нулевой длины или несколько очередей. Используемый алгоритм маршрутизации зависит от типа обмена и правил, называемых привязками.

Очередь запросов (Request Queue)
Основная очередь, из которой сервис ожидает получения запросов. Другими словами, если сервису нужно запросить какие-либо источники у сервиса изначально, он создает сообщение в своей очереди запросов. Сервис начинает потреблять.

Очередь ответов (Reply Queue)
Очередь, из которой сервис ожидает ответа.
Когда производитель публикует сообщение и если опция waitforResponse равна true, сервис ожидает ответное сообщение в своей очереди ответов.

CorrelationId
В распределенных системах каждое сообщение, циркулирующее между различными системами, обладает идемпотентным идентификатором, который является уникальным модификатором, создаваемым в момент инициирования связи с клиентом, и остается неизменным на протяжении всего жизненного цикла сообщения.

Шаблон «запрос-ответ» (Request-Reply Pattern)
Шаблон обмена сообщениями и протокол, в котором узел посылает сообщение другому узлу, потребляющему это сообщение, и ожидает ответа от этого узла асинхронным образом. Он либо ожидает неопределенное время, пока узел не ответит, либо пока не будет достигнут тайм-аут (при условии определения такового).

RPC (удаленный вызов процедур)
Модель, используемая в шаблоне «запрос-ответ», когда отправитель, или запросчик, посылает сообщение с намерением выполнить процедуру удаленно на ответчике, или получателе сообщения, и ожидает получения результата этой процедуры в качестве ответа.

functionName
Поле, специфичное для нашей реализации, обозначающее имя процедуры, которую производитель хочет выполнить удаленно на потребителе и получить результат в качестве ответа. Мы всегда отправляем его в параметре headers в методе produce().

Читайте также:

Читайте нас в Telegram, VK и Дзен


Перевод статьи Basem Mostafa: The Request-reply Pattern in RabbitMQ: An EventListener-based approach in Node.js

Предыдущая статьяОптимизация начальной загрузки сервера с RocksDB