Go & RabbitMQ

Мы создадим систему рабочих очередей, в которой будут появляться новые задания. Обработчики (workers) будут следить за очередью и выполнять работу по мере поступления. Система рабочих очередей идеально подходит для фоновых заданий, которые длятся дольше, чем обычный http-запрос. Один из примеров рабочих очередей: приложение обрабатывает загруженные пользователем фото, создает несколько версий и делится ими в различных социальных сетях. Изменение размера фото и загрузка их на другие сайты занимает время. Стоит ли выполнять эту работу внутри обработчика http? Скорее всего, нет. После загрузки фото сохраните его в удобном месте и передайте детали фоновым обработчикам.

Система сообщений

Для выполнения этой задачи мы создадим систему, работающую в качестве очереди сообщений. Для этого нужна распределенная система рабочих очередей, в которой обработчики и поставщики задач находятся на разных серверах. Для создания такой распределенной системы нужна централизованная очередь сообщений/брокер сообщений и система для передачи сообщений, которая будет доставлять эти сообщения к обработчикам.

Существует несколько инструментов для решения этой задачи — Redis, Kafka, RabbitMQ, ZeroMQ, IronMQ, AWS SQS и т.д. В данном примере мы будем использовать RabbitMQ и Go, чтобы создать простую систему рабочих очередей.

Концепции RabbitMQ

Поставщики (Producers) и подписчики (Consumers): поставщик создает новые сообщения/задачи, а подписчики их принимают. В данном примере после загрузки файла http-обработчик создает сообщение для обработчиков. http-обработчик/веб-приложение — это поставщик, а фоновые обработчики — подписчики.

Точка обмена (Exchange) и очереди (Queues): Точки обмена получают сообщения от поставщиков и доставляют их в очереди. Подписчики принимают сообщения из очередей. RabbitMQ предоставляет мощные функции маршрутизации сообщений. Доставку сообщений в очереди можно настроить разными способами.

Пример командной строки

В этой статье мы не будем создавать полноценное веб-приложение с загрузкой файлов, поэтому все действия будут выполняться в командной строке. Мы создадим инструмент для публикации командной строки, который будет публиковать/создавать сообщения, а также подписчика, который будет принимать сообщения. Затем мы запустим несколько экземпляров подписчиков параллельно, чтобы продемонстрировать возможности масштабирования этой системы с добавлением большего количества обработчиков.

Мы создадим калькулятор, который может принимать два числа и выводить их сумму, и сделаем его масштабируемым с помощью Go и RabbitMQ.

Подготовка

Прежде чем приступить к созданию калькулятора, нужно установить RabbitMQ. Для нас, разработчиков, идеальным вариантом является localhost. Устанавливаем RabbitMQ на локальный компьютер и запускаем его.

Установка RabbitMQ варьируется в зависимости от платформы. На MacBook его можно установить с помощью Homebrew. В дистрибутиве Linux он, вероятно, доступен из менеджера пакетов. Для Windows должны быть устанавливаемые пакеты.

Теперь нужно установить пакет Go AMQP. Для этого я использую модули Go. Можно воспользоваться go get или системой управления зависимостями.

go get github.com/streadway/amqp

Создание подписчика

Создаем каталог consumer, внутри которого мы разместим приложение consumer. Приложение consumer должно подключиться к RabbitMQ, объявить очередь, которую оно хочет прослушать, а затем начать принимать сообщения.

Для начала создадим функцию обработки ошибок:

Если ошибка не nil, то будет напечатано сообщение и детали ошибки, а затем работа будет прекращена. Так работает вышеуказанная функция.

Теперь переходим к установке соединения с RabbitMQ.

Попробуем подключиться к RabbitMQ и завершить работу в случае сбоя. URL соединения хранится в файле shared.go верхнего уровня. Значение установлено в: amqp://guest:[email protected]:5672/.

При успешном соединении нужно установить канал. Не путайте его с каналом Go. У RabbitMQ есть собственная концепция каналов. Соединение — это соединение TCP от клиента к серверу, создание которого — дорогостоящая операция. Канал служит протоколом связи по соединению и не занимает много ресурсов. Следует стремиться к ограничению количества подключений.

Теперь можно приступить к общению с RabbitMQ. Нужно сообщить серверу об интересующей нас очереди:

Далее создаем очередь с названием add. Документацию с указанием аргументов функции можно найти здесь.

RabbitMQ начинает доставлять сообщения подписчикам в режиме круговой системы. Таким образом, он в равной степени распределяет работу для всех обработчиков. Если некоторые задания занимают больше времени, а другие завершаются в первую очередь, у одного обработчика будет много накопленных заданий, а другой останется без дела. Для устранения подобных сценариев просим RabbitMQ доставлять новые сообщения, только когда работник подтвердит предыдущее. В документации по функции Qos можно найти более подробную информацию.

Переходим к обработке сообщений:

Просмотреть аргументы можно здесь. На этот раз получаем канал Go. Используем функцию range для этого канала, чтобы получить сообщения.

Мы будем отправлять сообщения в формате JSON. Чтобы представить задачу для операции add, нужно снова определить тип в файле shared.go:

Используем функцию range для messageChannel, чтобы декодировать тело сообщения в экземпляры AddTask и затем суммировать Number1 и Number2 для получения результатов.

Запускаем горутину с вызовом go func(). Она работает в фоновом режиме, поэтому нам нужен способ, гарантирующий, что основной интерфейс командной строки (обработчик), работающий на переднем плане, не завершит работу. Можно использовать канал и прослушивать его, чтобы продлить ожидание.

Тем временем мы просматриваем сообщения в горутине, обрабатываем тело сообщения и ошибки, а затем подтверждаем их. Во время вызова метода Consume устанавливаем значение false для autoAck. Таким образом, подтверждение обработанного сообщения выполняется вручную. Если сообщение не подтверждено и обработчик теряет соединение, RabbitMQ передает это сообщение другим обработчикам. Благодаря этому можно повторять сообщения, даже когда обработчик выходит из строя.

Также не стоит забывать о подтверждении сообщений вручную при автоматическом отключении. В противном случае RabbitMQ не удалит сообщения (они не подтверждены — соответственно еще не выполнены), и они будут заполнять память.

Если мы выполним go build внутри директории подписчиков и запустим ./cosumer, то подписчик также должен запуститься (если все сделано правильно).

➜  consumer git:(master) ✗ ./consumer
2019/02/23 20:54:55 Consumer ready, PID: 36361

Подписчик выглядит следующим образом:

package main

import (
	"encoding/json"
	"log"
	"os"

	gopher_and_rabbit "github.com/masnun/gopher-and-rabbit"
	"github.com/streadway/amqp"
)

func handleError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}

}

func main() {
	conn, err := amqp.Dial(gopher_and_rabbit.Config.AMQPConnectionURL)
	handleError(err, "Can't connect to AMQP")
	defer conn.Close()

	amqpChannel, err := conn.Channel()
	handleError(err, "Can't create a amqpChannel")

	defer amqpChannel.Close()

	queue, err := amqpChannel.QueueDeclare("add", true, false, false, false, nil)
	handleError(err, "Could not declare `add` queue")

	err = amqpChannel.Qos(1, 0, false)
	handleError(err, "Could not configure QoS")

	messageChannel, err := amqpChannel.Consume(
		queue.Name,
		"",
		false,
		false,
		false,
		false,
		nil,
	)
	handleError(err, "Could not register consumer")

	stopChan := make(chan bool)

	go func() {
		log.Printf("Consumer ready, PID: %d", os.Getpid())
		for d := range messageChannel {
			log.Printf("Received a message: %s", d.Body)

			addTask := &gopher_and_rabbit.AddTask{}

			err := json.Unmarshal(d.Body, addTask)

			if err != nil {
				log.Printf("Error decoding JSON: %s", err)
			}

			log.Printf("Result of %d + %d is : %d", addTask.Number1, addTask.Number2, addTask.Number1+addTask.Number2)

			if err := d.Ack(false); err != nil {
				log.Printf("Error acknowledging message : %s", err)
			} else {
				log.Printf("Acknowledged message")
			}

		}
	}()

	// Остановка для завершения программы
	<-stopChan

}

Создание поставщика

Переходим к созданию поставщика, который генерирует случайные числа и отправляет их в очередь add. Для него также необходимо повторить объявление соединения, канала и очереди. Пропускаем эти действия и переходим к интересным частям.

Необходимо объявить очередь как для подписчика, так и для поставщика, поскольку неизвестно, кто из них приступит к выполнению первым. Поэтому нужно убедиться в постоянном наличии очереди до начала принятия/публикации.

Ранее мы видели тип AddTask, теперь сгенерируем два случайных числа и создадим экземпляр. Затем перекодируем его в JSON для публикации в точке обмена.

И опубликуем его:

На данный момент код выглядит так:

package main

import (
	"encoding/json"
	"github.com/masnun/gopher-and-rabbit"
	"github.com/streadway/amqp"
	"log"
	"math/rand"
	"time"
)

func handleError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}

}

func main() {
	conn, err := amqp.Dial(gopher_and_rabbit.Config.AMQPConnectionURL)
	handleError(err, "Can't connect to AMQP")
	defer conn.Close()

	amqpChannel, err := conn.Channel()
	handleError(err, "Can't create a amqpChannel")

	defer amqpChannel.Close()

	queue, err := amqpChannel.QueueDeclare("add", true, false, false, false, nil)
	handleError(err, "Could not declare `add` queue")

	rand.Seed(time.Now().UnixNano())

	addTask := gopher_and_rabbit.AddTask{Number1: rand.Intn(999), Number2: rand.Intn(999)}
	body, err := json.Marshal(addTask)
	if err != nil {
		handleError(err, "Error encoding JSON")
	}

	err = amqpChannel.Publish("", queue.Name, false, false, amqp.Publishing{
		DeliveryMode: amqp.Persistent,
		ContentType:  "text/plain",
		Body:         body,
	})

	if err != nil {
		log.Fatalf("Error publishing message: %s", err)
	}

	log.Printf("AddTask: %d+%d", addTask.Number1, addTask.Number2)

}

После сборки этого кода и запуска ./publisher он ставит задачу в очередь. При работе одного или нескольких подписчиков можно увидеть результаты.

➜  publisher git:(master) ✗ ./publisher
2019/02/23 21:09:59 AddTask: 221+345

Окно подписчика:

➜  consumer git:(master) ✗ ./consumer
2019/02/23 20:54:55 Consumer ready, PID: 36361
2019/02/23 21:09:59 Received a message: {"Number1":221,"Number2":345}
2019/02/23 21:09:59 Result of 221 + 345 is : 566
2019/02/23 21:09:59 Acknowledged message

Пустое имя точки обмена и очереди

Сообщения публикуются в точки обмена и принимаются из очередей. В примере публикатора не указано название точки обмена. Если ее название является пустой строкой, RabbitMQ отправляет сообщение прямо в очередь, переданную как название очереди.

Продвинутое использование

В данном случае использовалась простая именованная очередь. Точки обмена RabbitMQ могут выполнять намного больше действий. Существует несколько типов точек обмена сообщениями, с помощью которых можно разветвлять сообщения (доставлять одно и то же сообщение через несколько очередей) или выполнять сопоставление на основе тем при доставке сообщений в очереди. Точки обмена и очереди могут выполнять интеллектуальную настраиваемую маршрутизацию сообщений для обслуживания сложных случаев использования и создания сложных распределенных систем.

Код из этой статьи доступен на Github.

Читайте также:


Перевод статьи Abu Ashraf Masnun: Work queue with Go and RabbitMQ