Как известно, Go — это язык программирования с открытым исходным кодом, позволяющий легко создавать простое, надёжное и эффективное программное обеспечение.
RabbitMQ представляет собой общедоступный программный брокер сообщений, который изначально реализовывал расширенный протокол очереди сообщений (Advanced Message Queuing Protocol). Впоследствии он был дополнен ещё и расширяемой архитектурой для поддержки протокола потоковой передачи текстовых сообщений (Streaming Text Oriented Messaging Protocol), протокола передачи телеметрии очереди сообщений (MQ Telemetry Transport) и других.
В этом руководстве мы создадим минимально необходимый клиент RabbitMQ, позволяющий другим пакетам публиковать сообщения или подписываться на сообщения, отправляемые в RabbitMQ или поступающие из него. Исходный код находится в GitHub.
Необходимые условия
- Предполагается, что
Go
у вас уже установлен. Если нет, загляните сюда. - Необходимо хотя бы немного знать о модулях Go. Они понадобятся в качестве решения для управления зависимостями.
- Нужно иметь общее представление о том, как работает RabbitMQ. Все аспекты и типы приёма и отправки сообщений подробно рассматриваться не будут.
Первые шаги
Сначала нужно создать каталог с именем модуля. Назовём его rmq
. Затем инициализируем модули Go с помощью команды go mod init
. Теперь нужно получить основную зависимость. Для этого задействуем команду go get github.com/streadway/amqp.
.
Затем в корневом каталоге модуля создаём файл main.go
. С этого файла всё начнётся (и, возможно, в нём закончится). Основной файл должен быть таким:
package rmq
Теперь создадим пользовательский тип RabbitClient
и добавим в этом клиенте отдельно каналы подписчика и издателя, а также соединение.
type RabbitClient struct {
sendConn *amqp.Connection
recConn *amqp.Connection
sendChan *amqp.Channel
recChan *amqp.Channel
}
Подключение и создание каналов
В этой части добавим два закрытых метода для пользовательского типа, который пытается подключиться к RabbitMQ, а затем создаёт канал на основе типа соединения (подписчик|издатель), и подключаемся повторно (если он уже существует).
Метод connect
получает два аргумента с булевым значением, позволяя узнать о типе соединения и режиме повторного подключения. Предполагается, что у вас уже есть информация о сервисе RabbitMQ (имя пользователя, пароль, хост и порт) в пользовательском типе config
.
// Создаём подключение к rabbitmq
func (rcl *RabbitClient) connect(isRec, reconnect bool) (*amqp.Connection, error) {
if reconnect {
if isRec {
rcl.recConn = nil
} else {
rcl.sendConn = nil
}
}
if isRec && rcl.recConn != nil {
return rcl.recConn, nil
} else if !isRec && rcl.sendConn != nil {
return rcl.sendConn, nil
}
var c string
if config.Username == "" {
c = fmt.Sprintf("amqp://%s:%s/", config.Host, config.Port)
} else {
c = fmt.Sprintf("amqp://%s:%s@%s:%s/", config.Username, config.Password, config.Host, config.Port)
}
conn, err := amqp.Dial(c)
if err != nil {
log.Printf("\r\n--- could not create a conection ---\r\n")
time.Sleep(1 * time.Second)
return nil, err
}
if isRec {
rcl.recConn = conn
return rcl.recConn, nil
} else {
rcl.sendConn = conn
return rcl.sendConn, nil
}
}
Как и connect
, метод channel
получает два аргумента с булевым значением, позволяя узнать о типе соединения и режиме повторного подключения. Этот метод пытается навсегда подключиться к сервису RabbitMQ, а затем создать канал на основе типа соединения.
func (rcl *RabbitClient) channel(isRec, recreate bool) (*amqp.Channel, error) {
if recreate {
if isRec {
rcl.recChan = nil
} else {
rcl.sendChan = nil
}
}
if isRec && rcl.recConn == nil {
rcl.recChan = nil
}
if !isRec && rcl.sendConn == nil {
rcl.recChan = nil
}
if isRec && rcl.recChan != nil {
return rcl.recChan, nil
} else if !isRec && rcl.sendChan != nil {
return rcl.sendChan, nil
}
for {
_, err := rcl.connect(isRec, recreate)
if err == nil {
break
}
}
var err error
if isRec {
rcl.recChan, err = rcl.recConn.Channel()
} else {
rcl.sendChan, err = rcl.sendConn.Channel()
}
if err != nil {
log.Println("--- could not create channel ---")
time.Sleep(1 * time.Second)
return nil, err
}
if isRec {
return rcl.recChan, err
} else {
return rcl.sendChan, err
}
}
Теперь, когда есть возможность подключаться и создавать каналы, начнём публиковать сообщения и подписываться на них. Объявим ленивые очереди. Это постоянные очереди, которые сохраняют своё состояние и восстанавливаются после перезапуска сервера/брокера как в режиме подписки, так и в режиме публикации. Здесь можно вносить изменения в соответствии с конкретной задачей.
Подпишемся на что-нибудь
Метод Consume
получает два аргумента. Один — это имя очереди, а другой — функция, обрабатывающая тело получаемого сообщения. В зависимости от результата этой функции переходим к ack|nack
(подтверждение|отрицательное подтверждение).
// Подписываемся исходя из имени очереди
func (rcl *RabbitClient) Consume(n string, f func(interface{}) error) {
for {
for {
_, err := rcl.channel(true, true)
if err == nil {
break
}
}
log.Printf("--- connected to consume '%s' ---\r\n", n)
q, err := rcl.recChan.QueueDeclare(
n,
true,
false,
false,
false,
amqp.Table{"x-queue-mode": "lazy"},
)
if err != nil {
log.Println("--- failed to declare a queue, trying to reconnect ---")
continue
}
connClose := rcl.recConn.NotifyClose(make(chan *amqp.Error))
connBlocked := rcl.recConn.NotifyBlocked(make(chan amqp.Blocking))
chClose := rcl.recChan.NotifyClose(make(chan *amqp.Error))
m, err := rcl.recChan.Consume(
q.Name,
uuid.NewV4().String(),
false,
false,
false,
false,
nil,
)
if err != nil {
log.Println("--- failed to consume from queue, trying again ---")
continue
}
shouldBreak := false
for {
if shouldBreak {
break
}
select {
case _ = <-connBlocked:
log.Println("--- connection blocked ---")
shouldBreak = true
break
case err = <-connClose:
log.Println("--- connection closed ---")
shouldBreak = true
break
case err = <-chClose:
log.Println("--- channel closed ---")
shouldBreak = true
break
case d := <-m:
err := f(d.Body)
if err != nil {
_ = d.Ack(false)
break
}
_ = d.Ack(true)
}
}
}
}
Метод Consume
обрабатывает NotifyClose
, NotifyBlocked
и NotifyClose
из соединения и канала и пытается повторно подключиться или воссоздать их при необходимости.
Опубликуем что-нибудь
Метод Publish
получает три аргумента, один из которых — имя очереди, а другой — байтовый массив, и содержит тело сообщения.
// Публикуем байтовый массив в очередь
func (rcl *RabbitClient) Publish(n string, b []byte) { r := false
for {
for {
_, err := rcl.channel(false, r)
if err == nil {
break
}
}
q, err := rcl.sendChan.QueueDeclare(
n,
true,
false,
false,
false,
amqp.Table{"x-queue-mode": "lazy"},
)
if err != nil {
log.Println("--- failed to declare a queue, trying to resend ---")
r = true
continue
}
err = rcl.sendChan.Publish(
"",
q.Name,
false,
false,
amqp.Publishing{
MessageId: uuid.NewV4().String(),
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: b,
})
if err != nil {
log.Println("--- failed to publish to queue, trying to resend ---")
r = true
continue
}
break
}
}
Этот метод выполняет повторное подключение или при необходимости воссоздание канала.
Использование
Создаём экземпляр типа RabbitClient
и используем метод Consume
или метод Publish
.
var rc rmq.RabbitClient
rc.Consume("test-queue", funcName)
rc.Publish("test-queue", mBody)
Что не было рассмотрено в статье:
- различные типы объявления очередей;
- использование точек обмена;
- модульные тесты.
Читайте также:
- Дирижируйте горутинами с помощью каналов
- Опыт работы с Golang: путь проб и ошибок
- Go на пороге третьего десятилетия 21 века: язык программирования для искусственного…
Читайте нас в Telegram, VK и Яндекс.Дзен
Перевод статьи Mehrdad Esmaeilpour: Creating a minimal RabbitMQ client using Go