Создание оркестратора для событийно-ориентированного приложения с Golang и RabbitMQ

Чтобы сделать асинхронное приложение, брокера сообщений не достаточно. В правильной ситуации асинхронными сообщениями улучшаются общая пропускная способность, временна́я задержка и взаимодействие с пользователем. Но без полноценной документации и следования шаблону сложность значительно увеличивается. В этой статье создадим конвейер для бронирования гостиницы с помощью оркеструемых событий на Go и RabbitMQ, она также применима к другим языкам и брокерам сообщений.

Примечание: не рассматриваются архитектура ПО, тестирование, ограничение скорости и другие подобные аспекты. Если бы мы создавали приложение, пригодное для промышленной эксплуатации, на их освящение понадобились бы десятки статей.

Определение требований

Номера в гостинице бронируются по мере доступности. Создадим конвейер передачи запроса на бронирование номера по различным сервисам: резервирования, проверки, зачисления средств, бронирования.

Сервисам не нужно ждать ответа друг от друга  —  они даже не «знают», откуда запрос: у каждого сервиса только одна задача, и он хорошо с ней справляется. Это называется снижением связанности. Не нужно задумываться о причине запроса и ждать ответа других сервисов  —  используем все преимущества архитектуры микросервисов.

Сначала создадим блок-схему:

Как видите, всего четыре этапа:

  1. Проверка: в некоторых сценариях гостиницами обслуживаются не все желающие, например кому-то закрыт доступ или определенный номер резервируется только для конкретной группы. Это сложные правила, отделим их от веб-API.
  2. Резервирование: одновременное бронирование номера несколькими людьми предотвращается глобальной блокировкой, подобной Redis.
  3. Списание: зарезервировав номер, списываем средства.
  4. Бронирование: завершив процесс списания, удаляем резервирование и бронируем номер.

Но в любом сервисе случаются ошибки. Оркестрация  —  отличное подспорье для создания стабильного потока запросов, обработки ошибок и соответственных действий. Действия требуются при очевидных ошибках:

  • Недостаточно средств: удаляем резервирование.
  • Ошибка при бронировании: возвращаем средства и удаляем резервирование.

Настройка RabbitMQ

Не знакомы с RabbitMQ? Посмотрите руководство для начинающих, хотя основы мы разберем. RabbitMQ, как и Apache Kafka,  —  это приложение с отправителями и получателями сообщений. В приложении-чате отправителями сообщение отправляется, получателями  —  получается.

Как сообщению попасть к моему другу, а не случайному человеку в другой группе? Это сложная часть. В Apache Kafka для решения этой проблемы применяется простой, но мощный подход. В каждой теме имеется один или несколько физически обособленных разделов, а в каждой группе получателей каждый раздел считывается максимум одним получателем. RabbitMQ годится для такого рода приложений благодаря мощным типам exchange. Воспользуемся типом topic exchange.

Куда отправить новый запрос на бронирование? В RabbitMQ издателями публикуется новое сообщение с указанием в exchange имени, ключа и значения сообщения, им ничего не «известно» о получателях.

Но откуда типы exchange «знают», куда переадресовать сообщение? Здесь приходятся кстати очереди, считываемые получателями. Создадим очередь и определим, какие типы сообщений ими получаются. Например, с помощью ключа маршрутизации определим очередь, которой принимаются запросы на бронирование только в конкретном городе.

Допустим имеется три чата: A, B и C. Кто-то из чата A отправляет сообщение, ключ маршрутизации такой: room.A. Если всего 10 онлайн-пользователей, должно быть 10 разных очередей с одинаковым ключом room.A для каждого пользователя.

Имеется также панель администратора со всеми чатами. Ее ключ очереди: room.*. Нас интересует topic exchange. В RabbitMQ имеется четыре типа exchange.

Мы определили две очереди. Но каким exchange заполнить эти очереди? Узнаем с помощью привязок. Каждая очередь заполняется одним или несколькими exchange.

Покончив с основами маршрутизации RabbitMQ, разберем отличия двух систем: с хореографией и оркестрацией.

Система с хореографией

Система с хореографией похожа на дерево. Здесь нет супервизора для обработки аномалий, работа системы обеспечивается созданием различных очередей и ключей маршрутизации.

Система с оркестрацией полностью управляется оркестратором и остается асинхронной. Система A подключается к нескольким exchange и не «знает», кто отправил сообщение, ею лишь выполняется ее часть процесса. Например, служба обработки изображений, будучи частью нескольких оркестраций, не «знает», кто отправил сообщение, ею просто выполняется ее часть.

Вернемся к системе бронирования:

Спроектируем систему, где любой микросервис может использоваться другими оркестраторами в иных целях.

Создание приложения

Сначала создадим exchange и очереди, свяжем их вместе.

Объявление «exchange» и очередей

Любое получаемое в exchange сообщение удаляется. Поэтому, чтобы все настроить, создадим предварительно выполняемую задачу.

Сначала определим имена очередей и exchange:

var Exchange = "webapp.reservation"
var QueueOrchestrator = "orchestrator.reservation"
var QueueValidation = "validation"
var QueueReservation = "reservation"
var QueueCredit = "credit"
var QueueBooking = "booking"

Эти имена будут во всех пакетах, лучше определить их в конфигурационном файле:

var exchange = config.Exchange
var queueNames = []string{
config.QueueOrchestrator, config.QueueValidation, config.QueueReservation, config.QueueCredit, config.QueueValidation,
}

func main() {

conn, closeConnection := mq.NewRabbitMQ()
defer closeConnection()

channel, err := conn.Channel()
panicWithMessage(err, "couldn't create a channel")

// Объявляем основной «exchange», привязываемый к очередям
err = channel.ExchangeDeclare(exchange, "topic", true,
false, false, false, nil)
panicWithMessage(err, "couldn't declare validation exchange")

for _, name := range queueNames {
_, err = channel.QueueDeclare(name, true,
false, false, false, nil)
panicWithMessage(err, fmt.Sprintf("couldn't declare the %s queue", name))

err = channel.QueueBind(name, name+".*", exchange, false, nil)
panicWithMessage(err, fmt.Sprintf("couldn't bind the %s queue ", name))
}

log.Println("> declaration completed")

}

NewRabbitMQ  —  это функция для создания нового AMQP-подключения. Сначала мы объявили основной exchange темы. Из-за использованной библиотеки github.com/rabbitmq/amqp091-go в коде много true и false, но в целом для durability задается только true. От этого RabbitMQ не делается брокером длительно сохраняемых сообщений, как Apache Kafka, а только обеспечивается сохранение очередей и определения exchange.

Каждой очередью прослушиваются ключи, например очередью validation  —  ключи validation.*. Запомним это, так будет во всем проекте.

Запуск HTTP-сервера

Создадим HTTP-сервер, подключаемый к Redis и экземпляру RabbitMQ, с двумя маршрутами: одним для публикации нового запроса на бронирование, другим для отображения статуса запроса:

func main() {

rdb := redis.NewClient(&redis.Options{Addr: "127.0.0.1:6379"})
bookingRequestRepository := repository.NewRedis[types.BookingRequest](rdb)

connection, closeConnection := mq.NewRabbitMQ()
defer closeConnection()
channel, err := connection.Channel()
panicWithMessage(err, "error while establishing a channel")

e := echo.New()
e.GET("/book", func(c echo.Context) error {
request := types.BookingRequest{
UID: shortid.MustGenerate(),
UserID: c.QueryParam("user_id"),
RoomID: c.QueryParam("room_id"),
}
err := channel.PublishWithContext(context.TODO(), config.Exchange, config.QueueOrchestrator+".new",
false, false, amqp091.Publishing{
Body: types.Encode(request),
})
if err != nil {
return c.String(500, "something went wrong")
}
return c.String(201, fmt.Sprintf("request %s has been created!", request.UID))
})

e.GET("/book/:uid", func(c echo.Context) error {
key := "bookingRequest:" + c.Param("uid")
item, _ := bookingRequestRepository.Get(c.Request().Context(), key)
return c.String(201, fmt.Sprintf("status=%s\ninfo=%s", item.State, item.Additional))
})
go func() {
e.Logger.Fatal(e.Start(":1323"))
}()

mq.GracefullyExit()

}

Тип запроса на бронирование:

type BookingRequest struct {
UID string `json:"UID"`
UserID string `json:"user_id"`
RoomID string `json:"room_id"`

State string `json:"state"`
Additional string `json:"additional"`
}

Для каждого запроса создается новый BookingRequest с уникальным идентификатором, затем отправляется в exchange с ключом маршрутизации orchestrator.reservation.new. Очередью оркестратора будет получена копия сообщения, потому что она привязана к exchange для любого ключа, соответствующего регулярному выражению orchestrator.reservation.*.

Создание оркестратора

Сделаем простое решение маршрутизации для отправки в каждую задачу сообщений с разными целями, не забывая о предупреждении в начале статьи:

func main() {

rdb := redis.NewClient(&redis.Options{Addr: "127.0.0.1:6379"})
bookingRequestRepository := repository.NewRedis[types.BookingRequest](rdb)

connection, closeConnection := mq.NewRabbitMQ()
defer closeConnection()

consumeChannel, err := connection.Channel()
panicWithMessage(err, "couldn't establish a consume Channel")

produceChannel, err := connection.Channel()
panicWithMessage(err, "couldn't establish a produce Channel")

queue, err := consumeChannel.QueueDeclare(config.QueueOrchestrator, true,
false, false, false, nil)
panicWithMessage(err, "couldn't declare queue")

msgs, err := consumeChannel.Consume(queue.Name, "", true, false, false, false, nil)
panicWithMessage(err, "error while creating the consumer")

// обновление состояния запроса и публикация в очереди резервирования
handleNew := func(msg amqp091.Delivery) error {}

handleValidated := func(msg amqp091.Delivery) error {}

handleReserved := func(msg amqp091.Delivery) error {}

handleDeposit := func(msg amqp091.Delivery) error {}

handleBooked := func(msg amqp091.Delivery) error {}

handleRefund := func(msg amqp091.Delivery) error {}

actions := map[string]func(msg amqp091.Delivery) error{
"new": handleNew,
"validated": handleValidated,
"reserved": handleReserved,
"deposit": handleDeposit,
"booked": handleBooked,
"refund": handleRefund,
}

go func() {
// получение сообщений
for msg := range msgs {
if err := mq.Handler(msg, config.QueueOrchestrator, actions); err != nil {
// здесь следует использовать очереди недоставленных сообщений
log.Printf("key:%s\tunhandled error: %s\n", msg.RoutingKey, err.Error())
}
}
}()
mq.GracefullyExit()
  • Для отправки и получения нужно два разных виртуальных канала по TCP-подключению к RabbitMQ.
  • Если сущности уже имеются, функцией declare сущность не воссоздается, а используется имеющаяся.
  • Автоподтверждению задано значение true.

Что такое «очереди недоставленных сообщений»? Когда у exchange нет очереди для сообщения или сообщением вызывается неожиданная ошибка, нужно отправить их в очередь, обрабатываемую вручную, и использовать эти данные для повышения отказоустойчивости приложения.

Когда новый запрос отправляется в HTTP API, там публикуется новое сообщение с ключом orchestrator.reservation.new. Вот функция, которой из названия темы извлекается экшен:

handleNew := func(msg amqp091.Delivery) error {
request := types.Decode[types.BookingRequest](msg.Body)
log.Printf("recieved a new booking request #%s\n", request.UID)
request.State = "proceeding"
bookingRequestRepository.Save(context.TODO(), request)

return produceChannel.PublishWithContext(context.TODO(), config.Exchange, config.QueueValidation+".validate", false,
false, amqp091.Publishing{
Body: types.Encode(types.ValidationRequest{
UID: request.UID,
UserID: request.UserID,
RoomID: request.RoomID,
}),
ReplyTo: config.QueueOrchestrator + ".validated",
})
}

Когда оркестратором получается новый запрос на бронирование, публикуется сообщение о проверке. Самая важная часть  —  ReplyTo, которым определяем, в какой очереди публиковать ответ. Это нужно для независимой работы средства проверки и с другими оркестраторами.

Создание задачи для средства проверки

handleValidation := func(msg amqp091.Delivery) error {
validationRequest := types.Decode[types.ValidationRequest](msg.Body)
validationRequest.Validated = true
if validationRequest.UserID == "some_guy" && validationRequest.RoomID == "some_hotel" {
validationRequest.Validated = false
validationRequest.Errors = append(validationRequest.Errors, "you have been blocked by this hotel")
}
return produceChannel.PublishWithContext(context.TODO(), msg.Exchange, msg.ReplyTo, false, false,
amqp091.Publishing{
Body: types.Encode(validationRequest),
})
}

actions := map[string]func(msg amqp091.Delivery) error{
"validate": handleValidation,
}

Создан другой файл  —  точно такой же, как оркестратор,  —  всего с одним экшеном. Обычно, чтобы проверить валидность запроса, в файле заранее собираются правила и используются базы данных или кеши. Но ради упрощения захардкодим правило: проверим в будущем. Кроме того, результат опубликован в данной очереди с помощью msg.ReplyTo, для указания exchange используется msg.Exchange.

Создание других частей приложения

Резервирование и отмена брони:

handleReserve := func(msg amqp091.Delivery) error {
request := types.Decode[types.ReservationRequest](msg.Body)
// использование глобальной блокировки
request.Reserved = true
if request.RoomID == "reserved" {
request.Reserved = false
request.Errors = append(request.Errors, "the room is already reserved by someone else")
}
return produceChannel.PublishWithContext(context.TODO(), msg.Exchange, msg.ReplyTo,
false, false, amqp091.Publishing{
Body: types.Encode(request),
})
}

handleCancel := func(msg amqp091.Delivery) error {
request := types.Decode[types.ReservationRequest](msg.Body)
return rdb.Del(context.TODO(), "room:"+request.RoomID).Err()
}

Внесение и возврат средств:

handleDeposit := func(msg amqp091.Delivery) error {
deposit := types.Decode[types.DepositRequest](msg.Body)
deposit.Done = true
if deposit.UserID == "poor_guy" {
deposit.Done = false
deposit.Errors = append(deposit.Errors, "insufficient credit")
}
return produceChannel.PublishWithContext(context.TODO(), msg.Exchange, msg.ReplyTo, false, false,
amqp091.Publishing{
Body: types.Encode(deposit),
})
}

handleRefund := func(msg amqp091.Delivery) error {
deposit := types.Decode[types.DepositRequest](msg.Body)
// бизнес-логика...
deposit.Done = true
if deposit.UserID == "very_unlucky_guy" {
deposit.Done = false
deposit.Errors = append(deposit.Errors, "unexpected error")
}
return produceChannel.PublishWithContext(context.TODO(), msg.Exchange, msg.ReplyTo, false, false,
amqp091.Publishing{
Body: types.Encode(deposit),
})
}

И, наконец, система бронирования:

handleBooking := func(msg amqp091.Delivery) error {
book := types.Decode[types.BookRequest](msg.Body)
book.Done = true
if book.UserID == "unlucky_guy" || book.UserID == "very_unlucky_guy" {
book.Done = false
book.Errors = append(book.Errors, "something unexpected happened")
}
return produceChannel.PublishWithContext(context.TODO(), msg.Exchange, msg.ReplyTo, false, false,
amqp091.Publishing{
Body: types.Encode(book),
})
}

Чреватая ошибками логика добавлена для упрощения тестирования. Теперь завершаем логику в оркестраторе:

handleValidated := func(msg amqp091.Delivery) error {
validation := types.Decode[types.ValidationRequest](msg.Body)
request, _ := bookingRequestRepository.Get(context.TODO(), "bookingRequest:"+validation.UID)
log.Printf("validation #%s completed: %t", validation.UID, validation.Validated)
if validation.Validated {
return produceChannel.PublishWithContext(context.TODO(), config.Exchange, config.QueueReservation+".reserve",
false, false, amqp091.Publishing{
ReplyTo: config.QueueOrchestrator + ".reserved",
Body: types.Encode(types.ReservationRequest{
UID: request.UID,
RoomID: request.RoomID,
UserID: request.UserID,
}),
})
} else {
request.Additional = strings.Join(validation.Errors, ", ")
request.State = "finished"
return bookingRequestRepository.Save(context.TODO(), request)
}
}

handleReserved := func(msg amqp091.Delivery) error {
reserved := types.Decode[types.ReservationRequest](msg.Body)
request, _ := bookingRequestRepository.Get(context.TODO(), "bookingRequest:"+reserved.UID)
log.Printf("reservation #%s completed: %t", reserved.UID, reserved.Reserved)
if reserved.Reserved {
return produceChannel.PublishWithContext(context.TODO(), config.Exchange, config.QueueCredit+".deposit",
false, false, amqp091.Publishing{
ReplyTo: config.QueueOrchestrator + ".deposit",
Body: types.Encode(types.DepositRequest{
UID: request.UID,
UserID: request.UserID,
Delta: -2500, // в продакшене для этой цели нужна другая служба
}),
})
} else {
request.Additional = strings.Join(reserved.Errors, ", ")
request.State = "finished"
return bookingRequestRepository.Save(context.TODO(), request)
}
}

handleDeposit := func(msg amqp091.Delivery) error {
deposit := types.Decode[types.DepositRequest](msg.Body)
request, _ := bookingRequestRepository.Get(context.TODO(), "bookingRequest:"+deposit.UID)
log.Printf("deposit #%s completed: %t", deposit.UID, deposit.Done)
if deposit.Done {
return produceChannel.PublishWithContext(context.TODO(), config.Exchange, config.QueueBooking+".book",
false, false, amqp091.Publishing{
ReplyTo: config.QueueOrchestrator + ".booked",
Body: types.Encode(types.BookRequest{
UID: request.UID,
UserID: request.UserID,
RoomID: request.RoomID,
}),
})
} else {
request.Additional = strings.Join(deposit.Errors, ", ")
request.State = "finished"
return bookingRequestRepository.Save(context.TODO(), request)
}
}

handleBooked := func(msg amqp091.Delivery) error {
book := types.Decode[types.BookRequest](msg.Body)
request, _ := bookingRequestRepository.Get(context.TODO(), "bookingRequest:"+book.UID)
log.Printf("book #%s completed: %t", book.UID, book.Done)
if book.Done {
request.Additional = "booked successfully"
request.State = "booked"
} else {
request.Additional = strings.Join(book.Errors, ", ")
request.State = "finished"
if err := produceChannel.PublishWithContext(context.TODO(), config.Exchange, config.QueueCredit+".refund",
false, false, amqp091.Publishing{
ReplyTo: config.QueueOrchestrator + ".refund",
Body: types.Encode(types.DepositRequest{UID: request.UID, Delta: 2500, UserID: request.UserID}),
}); err != nil {
return err
}
}
if err := bookingRequestRepository.Save(context.TODO(), request); err != nil {
return err
}
return produceChannel.PublishWithContext(context.TODO(), config.Exchange, config.QueueReservation+".cancel",
false, false, amqp091.Publishing{
Body: types.Encode(types.ReservationRequest{UID: request.UID}),
})

}

handleRefund := func(msg amqp091.Delivery) error {
deposit := types.Decode[types.DepositRequest](msg.Body)
request, _ := bookingRequestRepository.Get(context.TODO(), "bookingRequest:"+deposit.UID)
log.Printf("refund #%s completed: %t", deposit.UID, deposit.Done)
if deposit.Done {
request.Additional += ", refund completed."
request.State = "refunded"
} else {
request.Additional += ", problem while refunding."
request.State = "finished"
}
return bookingRequestRepository.Save(context.TODO(), request)
}

Обрабатываем ошибки бронирования, возвращая средства и отменяя бронь. Опробуем систему:

Номер кем-то зарезервирован.

Вот запрос на бронирование с ошибкой и возвратом средств:

А это запрос на бронирование с ошибкой и без возврата:

Вот почему важна очередь недоставленных сообщений. Нет другого способа обработать такие ошибки, систему нужно исследовать вручную.

Заключение

Событийно-ориентированная архитектура  —  это интересно, с ней приложения создаются в масштабе, котором могли создаваться раньше.

Читайте также:

Читайте нас в TelegramVK и Дзен


Перевод статьи Mohammad Hoseini Rad: Building an Orchestrator for an Event-Driven Application using Golang and RabbitMQ

Предыдущая статьяКак объединить мобильные сервисы Google и Huawei в одной кодовой базе
Следующая статьяРост производительности машинного обучения с Rust