Чтобы сделать асинхронное приложение, брокера сообщений не достаточно. В правильной ситуации асинхронными сообщениями улучшаются общая пропускная способность, временна́я задержка и взаимодействие с пользователем. Но без полноценной документации и следования шаблону сложность значительно увеличивается. В этой статье создадим конвейер для бронирования гостиницы с помощью оркеструемых событий на Go и RabbitMQ, она также применима к другим языкам и брокерам сообщений.
Примечание: не рассматриваются архитектура ПО, тестирование, ограничение скорости и другие подобные аспекты. Если бы мы создавали приложение, пригодное для промышленной эксплуатации, на их освящение понадобились бы десятки статей.
Определение требований
Номера в гостинице бронируются по мере доступности. Создадим конвейер передачи запроса на бронирование номера по различным сервисам: резервирования, проверки, зачисления средств, бронирования.
Сервисам не нужно ждать ответа друг от друга — они даже не «знают», откуда запрос: у каждого сервиса только одна задача, и он хорошо с ней справляется. Это называется снижением связанности. Не нужно задумываться о причине запроса и ждать ответа других сервисов — используем все преимущества архитектуры микросервисов.
Сначала создадим блок-схему:
Как видите, всего четыре этапа:
- Проверка: в некоторых сценариях гостиницами обслуживаются не все желающие, например кому-то закрыт доступ или определенный номер резервируется только для конкретной группы. Это сложные правила, отделим их от веб-API.
- Резервирование: одновременное бронирование номера несколькими людьми предотвращается глобальной блокировкой, подобной Redis.
- Списание: зарезервировав номер, списываем средства.
- Бронирование: завершив процесс списания, удаляем резервирование и бронируем номер.
Но в любом сервисе случаются ошибки. Оркестрация — отличное подспорье для создания стабильного потока запросов, обработки ошибок и соответственных действий. Действия требуются при очевидных ошибках:
- Недостаточно средств: удаляем резервирование.
- Ошибка при бронировании: возвращаем средства и удаляем резервирование.
Настройка RabbitMQ
Не знакомы с RabbitMQ? Посмотрите руководство для начинающих, хотя основы мы разберем. RabbitMQ, как и Apache Kafka, — это приложение с отправителями и получателями сообщений. В приложении-чате отправителями сообщение отправляется, получателями — получается.
Как сообщению попасть к моему другу, а не случайному человеку в другой группе? Это сложная часть. В Apache Kafka для решения этой проблемы применяется простой, но мощный подход. В каждой теме имеется один или несколько физически обособленных разделов, а в каждой группе получателей каждый раздел считывается максимум одним получателем. RabbitMQ годится для такого рода приложений благодаря мощным типам exchange. Воспользуемся типом topic exchange.
Куда отправить новый запрос на бронирование? В RabbitMQ издателями публикуется новое сообщение с указанием в exchange имени, ключа и значения сообщения, им ничего не «известно» о получателях.
Но откуда типы exchange «знают», куда переадресовать сообщение? Здесь приходятся кстати очереди, считываемые получателями. Создадим очередь и определим, какие типы сообщений ими получаются. Например, с помощью ключа маршрутизации определим очередь, которой принимаются запросы на бронирование только в конкретном городе.
Допустим имеется три чата: A, B и C. Кто-то из чата A отправляет сообщение, ключ маршрутизации такой: room.A. Если всего 10 онлайн-пользователей, должно быть 10 разных очередей с одинаковым ключом room.A для каждого пользователя.
Имеется также панель администратора со всеми чатами. Ее ключ очереди: room.*. Нас интересует topic exchange. В RabbitMQ имеется четыре типа exchange.
Мы определили две очереди. Но каким exchange заполнить эти очереди? Узнаем с помощью привязок. Каждая очередь заполняется одним или несколькими exchange.
Покончив с основами маршрутизации RabbitMQ, разберем отличия двух систем: с хореографией и оркестрацией.
Система с хореографией похожа на дерево. Здесь нет супервизора для обработки аномалий, работа системы обеспечивается созданием различных очередей и ключей маршрутизации.
Система с оркестрацией полностью управляется оркестратором и остается асинхронной. Система A подключается к нескольким exchange и не «знает», кто отправил сообщение, ею лишь выполняется ее часть процесса. Например, служба обработки изображений, будучи частью нескольких оркестраций, не «знает», кто отправил сообщение, ею просто выполняется ее часть.
Вернемся к системе бронирования:
Спроектируем систему, где любой микросервис может использоваться другими оркестраторами в иных целях.
Создание приложения
Сначала создадим exchange и очереди, свяжем их вместе.
Объявление «exchange» и очередей
Любое получаемое в exchange сообщение удаляется. Поэтому, чтобы все настроить, создадим предварительно выполняемую задачу.
Сначала определим имена очередей и exchange:
var Exchange = "webapp.reservation"
var QueueOrchestrator = "orchestrator.reservation"
var QueueValidation = "validation"
var QueueReservation = "reservation"
var QueueCredit = "credit"
var QueueBooking = "booking"
Эти имена будут во всех пакетах, лучше определить их в конфигурационном файле:
var exchange = config.Exchange
var queueNames = []string{
config.QueueOrchestrator, config.QueueValidation, config.QueueReservation, config.QueueCredit, config.QueueValidation,
}
func main() {
conn, closeConnection := mq.NewRabbitMQ()
defer closeConnection()
channel, err := conn.Channel()
panicWithMessage(err, "couldn't create a channel")
// Объявляем основной «exchange», привязываемый к очередям
err = channel.ExchangeDeclare(exchange, "topic", true,
false, false, false, nil)
panicWithMessage(err, "couldn't declare validation exchange")
for _, name := range queueNames {
_, err = channel.QueueDeclare(name, true,
false, false, false, nil)
panicWithMessage(err, fmt.Sprintf("couldn't declare the %s queue", name))
err = channel.QueueBind(name, name+".*", exchange, false, nil)
panicWithMessage(err, fmt.Sprintf("couldn't bind the %s queue ", name))
}
log.Println("> declaration completed")
}
NewRabbitMQ — это функция для создания нового AMQP-подключения. Сначала мы объявили основной exchange темы. Из-за использованной библиотеки github.com/rabbitmq/amqp091-go в коде много true
и false
, но в целом для durability задается только true
. От этого RabbitMQ не делается брокером длительно сохраняемых сообщений, как Apache Kafka, а только обеспечивается сохранение очередей и определения exchange.
Каждой очередью прослушиваются ключи, например очередью validation — ключи validation.*. Запомним это, так будет во всем проекте.
Запуск HTTP-сервера
Создадим HTTP-сервер, подключаемый к Redis и экземпляру RabbitMQ, с двумя маршрутами: одним для публикации нового запроса на бронирование, другим для отображения статуса запроса:
func main() {
rdb := redis.NewClient(&redis.Options{Addr: "127.0.0.1:6379"})
bookingRequestRepository := repository.NewRedis[types.BookingRequest](rdb)
connection, closeConnection := mq.NewRabbitMQ()
defer closeConnection()
channel, err := connection.Channel()
panicWithMessage(err, "error while establishing a channel")
e := echo.New()
e.GET("/book", func(c echo.Context) error {
request := types.BookingRequest{
UID: shortid.MustGenerate(),
UserID: c.QueryParam("user_id"),
RoomID: c.QueryParam("room_id"),
}
err := channel.PublishWithContext(context.TODO(), config.Exchange, config.QueueOrchestrator+".new",
false, false, amqp091.Publishing{
Body: types.Encode(request),
})
if err != nil {
return c.String(500, "something went wrong")
}
return c.String(201, fmt.Sprintf("request %s has been created!", request.UID))
})
e.GET("/book/:uid", func(c echo.Context) error {
key := "bookingRequest:" + c.Param("uid")
item, _ := bookingRequestRepository.Get(c.Request().Context(), key)
return c.String(201, fmt.Sprintf("status=%s\ninfo=%s", item.State, item.Additional))
})
go func() {
e.Logger.Fatal(e.Start(":1323"))
}()
mq.GracefullyExit()
}
Тип запроса на бронирование:
type BookingRequest struct {
UID string `json:"UID"`
UserID string `json:"user_id"`
RoomID string `json:"room_id"`
State string `json:"state"`
Additional string `json:"additional"`
}
Для каждого запроса создается новый BookingRequest
с уникальным идентификатором, затем отправляется в exchange с ключом маршрутизации orchestrator.reservation.new. Очередью оркестратора будет получена копия сообщения, потому что она привязана к exchange для любого ключа, соответствующего регулярному выражению orchestrator.reservation.*.
Создание оркестратора
Сделаем простое решение маршрутизации для отправки в каждую задачу сообщений с разными целями, не забывая о предупреждении в начале статьи:
func main() {
rdb := redis.NewClient(&redis.Options{Addr: "127.0.0.1:6379"})
bookingRequestRepository := repository.NewRedis[types.BookingRequest](rdb)
connection, closeConnection := mq.NewRabbitMQ()
defer closeConnection()
consumeChannel, err := connection.Channel()
panicWithMessage(err, "couldn't establish a consume Channel")
produceChannel, err := connection.Channel()
panicWithMessage(err, "couldn't establish a produce Channel")
queue, err := consumeChannel.QueueDeclare(config.QueueOrchestrator, true,
false, false, false, nil)
panicWithMessage(err, "couldn't declare queue")
msgs, err := consumeChannel.Consume(queue.Name, "", true, false, false, false, nil)
panicWithMessage(err, "error while creating the consumer")
// обновление состояния запроса и публикация в очереди резервирования
handleNew := func(msg amqp091.Delivery) error {}
handleValidated := func(msg amqp091.Delivery) error {}
handleReserved := func(msg amqp091.Delivery) error {}
handleDeposit := func(msg amqp091.Delivery) error {}
handleBooked := func(msg amqp091.Delivery) error {}
handleRefund := func(msg amqp091.Delivery) error {}
actions := map[string]func(msg amqp091.Delivery) error{
"new": handleNew,
"validated": handleValidated,
"reserved": handleReserved,
"deposit": handleDeposit,
"booked": handleBooked,
"refund": handleRefund,
}
go func() {
// получение сообщений
for msg := range msgs {
if err := mq.Handler(msg, config.QueueOrchestrator, actions); err != nil {
// здесь следует использовать очереди недоставленных сообщений
log.Printf("key:%s\tunhandled error: %s\n", msg.RoutingKey, err.Error())
}
}
}()
mq.GracefullyExit()
- Для отправки и получения нужно два разных виртуальных канала по TCP-подключению к RabbitMQ.
- Если сущности уже имеются, функцией
declare
сущность не воссоздается, а используется имеющаяся. - Автоподтверждению задано значение
true
.
Что такое «очереди недоставленных сообщений»? Когда у exchange нет очереди для сообщения или сообщением вызывается неожиданная ошибка, нужно отправить их в очередь, обрабатываемую вручную, и использовать эти данные для повышения отказоустойчивости приложения.
Когда новый запрос отправляется в HTTP API, там публикуется новое сообщение с ключом orchestrator.reservation.new. Вот функция, которой из названия темы извлекается экшен:
handleNew := func(msg amqp091.Delivery) error {
request := types.Decode[types.BookingRequest](msg.Body)
log.Printf("recieved a new booking request #%s\n", request.UID)
request.State = "proceeding"
bookingRequestRepository.Save(context.TODO(), request)
return produceChannel.PublishWithContext(context.TODO(), config.Exchange, config.QueueValidation+".validate", false,
false, amqp091.Publishing{
Body: types.Encode(types.ValidationRequest{
UID: request.UID,
UserID: request.UserID,
RoomID: request.RoomID,
}),
ReplyTo: config.QueueOrchestrator + ".validated",
})
}
Когда оркестратором получается новый запрос на бронирование, публикуется сообщение о проверке. Самая важная часть — ReplyTo
, которым определяем, в какой очереди публиковать ответ. Это нужно для независимой работы средства проверки и с другими оркестраторами.
Создание задачи для средства проверки
handleValidation := func(msg amqp091.Delivery) error {
validationRequest := types.Decode[types.ValidationRequest](msg.Body)
validationRequest.Validated = true
if validationRequest.UserID == "some_guy" && validationRequest.RoomID == "some_hotel" {
validationRequest.Validated = false
validationRequest.Errors = append(validationRequest.Errors, "you have been blocked by this hotel")
}
return produceChannel.PublishWithContext(context.TODO(), msg.Exchange, msg.ReplyTo, false, false,
amqp091.Publishing{
Body: types.Encode(validationRequest),
})
}
actions := map[string]func(msg amqp091.Delivery) error{
"validate": handleValidation,
}
Создан другой файл — точно такой же, как оркестратор, — всего с одним экшеном. Обычно, чтобы проверить валидность запроса, в файле заранее собираются правила и используются базы данных или кеши. Но ради упрощения захардкодим правило: проверим в будущем. Кроме того, результат опубликован в данной очереди с помощью msg.ReplyTo, для указания exchange используется msg.Exchange.
Создание других частей приложения
Резервирование и отмена брони:
handleReserve := func(msg amqp091.Delivery) error {
request := types.Decode[types.ReservationRequest](msg.Body)
// использование глобальной блокировки
request.Reserved = true
if request.RoomID == "reserved" {
request.Reserved = false
request.Errors = append(request.Errors, "the room is already reserved by someone else")
}
return produceChannel.PublishWithContext(context.TODO(), msg.Exchange, msg.ReplyTo,
false, false, amqp091.Publishing{
Body: types.Encode(request),
})
}
handleCancel := func(msg amqp091.Delivery) error {
request := types.Decode[types.ReservationRequest](msg.Body)
return rdb.Del(context.TODO(), "room:"+request.RoomID).Err()
}
Внесение и возврат средств:
handleDeposit := func(msg amqp091.Delivery) error {
deposit := types.Decode[types.DepositRequest](msg.Body)
deposit.Done = true
if deposit.UserID == "poor_guy" {
deposit.Done = false
deposit.Errors = append(deposit.Errors, "insufficient credit")
}
return produceChannel.PublishWithContext(context.TODO(), msg.Exchange, msg.ReplyTo, false, false,
amqp091.Publishing{
Body: types.Encode(deposit),
})
}
handleRefund := func(msg amqp091.Delivery) error {
deposit := types.Decode[types.DepositRequest](msg.Body)
// бизнес-логика...
deposit.Done = true
if deposit.UserID == "very_unlucky_guy" {
deposit.Done = false
deposit.Errors = append(deposit.Errors, "unexpected error")
}
return produceChannel.PublishWithContext(context.TODO(), msg.Exchange, msg.ReplyTo, false, false,
amqp091.Publishing{
Body: types.Encode(deposit),
})
}
И, наконец, система бронирования:
handleBooking := func(msg amqp091.Delivery) error {
book := types.Decode[types.BookRequest](msg.Body)
book.Done = true
if book.UserID == "unlucky_guy" || book.UserID == "very_unlucky_guy" {
book.Done = false
book.Errors = append(book.Errors, "something unexpected happened")
}
return produceChannel.PublishWithContext(context.TODO(), msg.Exchange, msg.ReplyTo, false, false,
amqp091.Publishing{
Body: types.Encode(book),
})
}
Чреватая ошибками логика добавлена для упрощения тестирования. Теперь завершаем логику в оркестраторе:
handleValidated := func(msg amqp091.Delivery) error {
validation := types.Decode[types.ValidationRequest](msg.Body)
request, _ := bookingRequestRepository.Get(context.TODO(), "bookingRequest:"+validation.UID)
log.Printf("validation #%s completed: %t", validation.UID, validation.Validated)
if validation.Validated {
return produceChannel.PublishWithContext(context.TODO(), config.Exchange, config.QueueReservation+".reserve",
false, false, amqp091.Publishing{
ReplyTo: config.QueueOrchestrator + ".reserved",
Body: types.Encode(types.ReservationRequest{
UID: request.UID,
RoomID: request.RoomID,
UserID: request.UserID,
}),
})
} else {
request.Additional = strings.Join(validation.Errors, ", ")
request.State = "finished"
return bookingRequestRepository.Save(context.TODO(), request)
}
}
handleReserved := func(msg amqp091.Delivery) error {
reserved := types.Decode[types.ReservationRequest](msg.Body)
request, _ := bookingRequestRepository.Get(context.TODO(), "bookingRequest:"+reserved.UID)
log.Printf("reservation #%s completed: %t", reserved.UID, reserved.Reserved)
if reserved.Reserved {
return produceChannel.PublishWithContext(context.TODO(), config.Exchange, config.QueueCredit+".deposit",
false, false, amqp091.Publishing{
ReplyTo: config.QueueOrchestrator + ".deposit",
Body: types.Encode(types.DepositRequest{
UID: request.UID,
UserID: request.UserID,
Delta: -2500, // в продакшене для этой цели нужна другая служба
}),
})
} else {
request.Additional = strings.Join(reserved.Errors, ", ")
request.State = "finished"
return bookingRequestRepository.Save(context.TODO(), request)
}
}
handleDeposit := func(msg amqp091.Delivery) error {
deposit := types.Decode[types.DepositRequest](msg.Body)
request, _ := bookingRequestRepository.Get(context.TODO(), "bookingRequest:"+deposit.UID)
log.Printf("deposit #%s completed: %t", deposit.UID, deposit.Done)
if deposit.Done {
return produceChannel.PublishWithContext(context.TODO(), config.Exchange, config.QueueBooking+".book",
false, false, amqp091.Publishing{
ReplyTo: config.QueueOrchestrator + ".booked",
Body: types.Encode(types.BookRequest{
UID: request.UID,
UserID: request.UserID,
RoomID: request.RoomID,
}),
})
} else {
request.Additional = strings.Join(deposit.Errors, ", ")
request.State = "finished"
return bookingRequestRepository.Save(context.TODO(), request)
}
}
handleBooked := func(msg amqp091.Delivery) error {
book := types.Decode[types.BookRequest](msg.Body)
request, _ := bookingRequestRepository.Get(context.TODO(), "bookingRequest:"+book.UID)
log.Printf("book #%s completed: %t", book.UID, book.Done)
if book.Done {
request.Additional = "booked successfully"
request.State = "booked"
} else {
request.Additional = strings.Join(book.Errors, ", ")
request.State = "finished"
if err := produceChannel.PublishWithContext(context.TODO(), config.Exchange, config.QueueCredit+".refund",
false, false, amqp091.Publishing{
ReplyTo: config.QueueOrchestrator + ".refund",
Body: types.Encode(types.DepositRequest{UID: request.UID, Delta: 2500, UserID: request.UserID}),
}); err != nil {
return err
}
}
if err := bookingRequestRepository.Save(context.TODO(), request); err != nil {
return err
}
return produceChannel.PublishWithContext(context.TODO(), config.Exchange, config.QueueReservation+".cancel",
false, false, amqp091.Publishing{
Body: types.Encode(types.ReservationRequest{UID: request.UID}),
})
}
handleRefund := func(msg amqp091.Delivery) error {
deposit := types.Decode[types.DepositRequest](msg.Body)
request, _ := bookingRequestRepository.Get(context.TODO(), "bookingRequest:"+deposit.UID)
log.Printf("refund #%s completed: %t", deposit.UID, deposit.Done)
if deposit.Done {
request.Additional += ", refund completed."
request.State = "refunded"
} else {
request.Additional += ", problem while refunding."
request.State = "finished"
}
return bookingRequestRepository.Save(context.TODO(), request)
}
Обрабатываем ошибки бронирования, возвращая средства и отменяя бронь. Опробуем систему:
Вот запрос на бронирование с ошибкой и возвратом средств:
А это запрос на бронирование с ошибкой и без возврата:
Вот почему важна очередь недоставленных сообщений. Нет другого способа обработать такие ошибки, систему нужно исследовать вручную.
Заключение
Событийно-ориентированная архитектура — это интересно, с ней приложения создаются в масштабе, котором могли создаваться раньше.
Читайте также:
- 5 причин грядущего господства Go в мире программирования
- Не заблудитесь при работе с кластерами Kafka — возьмите компас
- Рабочая очередь в Go с RabbitMQ
Читайте нас в Telegram, VK и Дзен
Перевод статьи Mohammad Hoseini Rad: Building an Orchestrator for an Event-Driven Application using Golang and RabbitMQ