Как известно, Go  —  это язык программирования с открытым исходным кодом, позволяющий легко создавать простое, надёжное и эффективное программное обеспечение.

RabbitMQ представляет собой общедоступный программный брокер сообщений, который изначально реализовывал расширенный протокол очереди сообщений (Advanced Message Queuing Protocol). Впоследствии он был дополнен ещё и расширяемой архитектурой для поддержки протокола потоковой передачи текстовых сообщений (Streaming Text Oriented Messaging Protocol), протокола передачи телеметрии очереди сообщений (MQ Telemetry Transport) и других.

В этом руководстве мы создадим минимально необходимый клиент RabbitMQ, позволяющий другим пакетам публиковать сообщения или подписываться на сообщения, отправляемые в RabbitMQ или поступающие из него. Исходный код находится в GitHub.

Необходимые условия

  • Предполагается, что Go у вас уже установлен. Если нет, загляните сюда.
  • Необходимо хотя бы немного знать о модулях Go. Они понадобятся в качестве решения для управления зависимостями.
  • Нужно иметь общее представление о том, как работает RabbitMQ. Все аспекты и типы приёма и отправки сообщений подробно рассматриваться не будут.

Первые шаги

Сначала нужно создать каталог с именем модуля. Назовём его rmq. Затем инициализируем модули Go с помощью команды go mod init. Теперь нужно получить основную зависимость. Для этого задействуем команду go get github.com/streadway/amqp..

Затем в корневом каталоге модуля создаём файл main.go. С этого файла всё начнётся (и, возможно, в нём закончится). Основной файл должен быть таким:

package rmq

Теперь создадим пользовательский тип RabbitClient и добавим в этом клиенте отдельно каналы подписчика и издателя, а также соединение.

type RabbitClient struct {
   sendConn *amqp.Connection
   recConn  *amqp.Connection
   sendChan *amqp.Channel
   recChan  *amqp.Channel
}

Подключение и создание каналов

В этой части добавим два закрытых метода для пользовательского типа, который пытается подключиться к RabbitMQ, а затем создаёт канал на основе типа соединения (подписчик|издатель), и подключаемся повторно (если он уже существует).

Метод connect получает два аргумента с булевым значением, позволяя узнать о типе соединения и режиме повторного подключения. Предполагается, что у вас уже есть информация о сервисе RabbitMQ (имя пользователя, пароль, хост и порт) в пользовательском типе config.

// Создаём подключение к rabbitmq
func (rcl *RabbitClient) connect(isRec, reconnect bool) (*amqp.Connection, error) {
 if reconnect {
  if isRec {
   rcl.recConn = nil
  } else {
   rcl.sendConn = nil
  }
 }
 if isRec && rcl.recConn != nil {
  return rcl.recConn, nil
 } else if !isRec && rcl.sendConn != nil {
  return rcl.sendConn, nil
 }
 var c string
 if config.Username == "" {
  c = fmt.Sprintf("amqp://%s:%s/", config.Host, config.Port)
 } else {
  c = fmt.Sprintf("amqp://%s:%s@%s:%s/", config.Username, config.Password, config.Host, config.Port)
 }
 conn, err := amqp.Dial(c)
 if err != nil {
  log.Printf("\r\n--- could not create a conection ---\r\n")
  time.Sleep(1 * time.Second)
  return nil, err
 }
 if isRec {
  rcl.recConn = conn
  return rcl.recConn, nil
 } else {
  rcl.sendConn = conn
  return rcl.sendConn, nil
 }
}

Как и connect, метод channel получает два аргумента с булевым значением, позволяя узнать о типе соединения и режиме повторного подключения. Этот метод пытается навсегда подключиться к сервису RabbitMQ, а затем создать канал на основе типа соединения.

func (rcl *RabbitClient) channel(isRec, recreate bool) (*amqp.Channel, error) {
   if recreate {
      if isRec {
         rcl.recChan = nil
      } else {
         rcl.sendChan = nil
      }
   }
   if isRec && rcl.recConn == nil {
      rcl.recChan = nil
   }
   if !isRec && rcl.sendConn == nil {
      rcl.recChan = nil
   }
   if isRec && rcl.recChan != nil {
      return rcl.recChan, nil
   } else if !isRec && rcl.sendChan != nil {
      return rcl.sendChan, nil
   }
   for {
      _, err := rcl.connect(isRec, recreate)
      if err == nil {
         break
      }
   }
   var err error
   if isRec {
      rcl.recChan, err = rcl.recConn.Channel()
   } else {
      rcl.sendChan, err = rcl.sendConn.Channel()
   }
   if err != nil {
      log.Println("--- could not create channel ---")
      time.Sleep(1 * time.Second)
      return nil, err
   }
   if isRec {
      return rcl.recChan, err
   } else {
      return rcl.sendChan, err
   }
}

Теперь, когда есть возможность подключаться и создавать каналы, начнём публиковать сообщения и подписываться на них. Объявим ленивые очереди. Это постоянные очереди, которые сохраняют своё состояние и восстанавливаются после перезапуска сервера/брокера как в режиме подписки, так и в режиме публикации. Здесь можно вносить изменения в соответствии с конкретной задачей.

Подпишемся на что-нибудь

Метод Consume получает два аргумента. Один  —  это имя очереди, а другой  —  функция, обрабатывающая тело получаемого сообщения. В зависимости от результата этой функции переходим к ack|nack (подтверждение|отрицательное подтверждение).

// Подписываемся исходя из имени очереди
func (rcl *RabbitClient) Consume(n string, f func(interface{}) error) {
 for {
  for {
   _, err := rcl.channel(true, true)
   if err == nil {
    break
   }
  }
  log.Printf("--- connected to consume '%s' ---\r\n", n)
  q, err := rcl.recChan.QueueDeclare(
   n,
   true,
   false,
   false,
   false,
   amqp.Table{"x-queue-mode": "lazy"},
  )
  if err != nil {
   log.Println("--- failed to declare a queue, trying to reconnect ---")
   continue
  }
  connClose := rcl.recConn.NotifyClose(make(chan *amqp.Error))
  connBlocked := rcl.recConn.NotifyBlocked(make(chan amqp.Blocking))
  chClose := rcl.recChan.NotifyClose(make(chan *amqp.Error))
  m, err := rcl.recChan.Consume(
   q.Name,
   uuid.NewV4().String(),
   false,
   false,
   false,
   false,
   nil,
  )
  if err != nil {
   log.Println("--- failed to consume from queue, trying again ---")
   continue
  }
  shouldBreak := false
  for {
   if shouldBreak {
    break
   }
   select {
   case _ = <-connBlocked:
    log.Println("--- connection blocked ---")
    shouldBreak = true
    break
   case err = <-connClose:
    log.Println("--- connection closed ---")
    shouldBreak = true
    break
   case err = <-chClose:
    log.Println("--- channel closed ---")
    shouldBreak = true
    break
   case d := <-m:
    err := f(d.Body)
    if err != nil {
     _ = d.Ack(false)
     break
    }
    _ = d.Ack(true)
   }
  }
 }
}

Метод Consume обрабатывает NotifyClose, NotifyBlocked и NotifyClose из соединения и канала и пытается повторно подключиться или воссоздать их при необходимости.

Опубликуем что-нибудь

Метод Publish получает три аргумента, один из которых  —  имя очереди, а другой  —  байтовый массив, и содержит тело сообщения.

// Публикуем байтовый массив в очередь
func (rcl *RabbitClient) Publish(n string, b []byte) { r := false
 for {
  for {
   _, err := rcl.channel(false, r)
   if err == nil {
    break
   }
  }
  q, err := rcl.sendChan.QueueDeclare(
   n,
   true,
   false,
   false,
   false,
   amqp.Table{"x-queue-mode": "lazy"},
  )
  if err != nil {
   log.Println("--- failed to declare a queue, trying to resend ---")
   r = true
   continue
  }
  err = rcl.sendChan.Publish(
   "",
   q.Name,
   false,
   false,
   amqp.Publishing{
    MessageId:    uuid.NewV4().String(),
    DeliveryMode: amqp.Persistent,
    ContentType:  "text/plain",
    Body:         b,
   })
  if err != nil {
   log.Println("--- failed to publish to queue, trying to resend ---")
   r = true
   continue
  }
  break
 }
}

Этот метод выполняет повторное подключение или при необходимости воссоздание канала.

Использование

Создаём экземпляр типа RabbitClient и используем метод Consume или метод Publish.

var rc rmq.RabbitClient
rc.Consume("test-queue", funcName)
rc.Publish("test-queue", mBody)

Что не было рассмотрено в статье:

  • различные типы объявления очередей;
  • использование точек обмена;
  • модульные тесты.

Читайте также:

Читайте нас в Telegram, VK и Яндекс.Дзен


Перевод статьи Mehrdad Esmaeilpour: Creating a minimal RabbitMQ client using Go

Предыдущая статья10 источников вдохновения для дизайнера
Следующая статьяСоздание динамического кластера ECS с помощью Terraform