Redis  —  это хранилище структуры данных в памяти, в основном используемое в качестве базы данных, кэша и брокера сообщений. Система Redis чрезвычайно популярна среди разработчиков благодаря невероятной скорости и поддержке различных типов структур данных, таких как строки, хэши, списки, наборы, растровые изображения, потоки и т. д.

Более того, в Redis есть множество заложенных функций, таких как встроенная репликация, сценарии Lua, алгоритм удаления LRU, транзакции, сохраняемость на диске и т. д., поэтому эта система так востребована. Сегодня мы поговорим о потоках  —  одном из типов структуры данных в Redis. Мы можем использовать потоки для создания службы обмена сообщениями между потребителями или микросервисами.

Проще говоря, мы можем использовать этот тип структуры данных в качестве брокера сообщений, с помощью которого будут взаимодействовать микросервисы. Итак, начнем.

Что представляют собой потоки Redis?

Говоря простым языком, потоки Redis  —  это структура данных Redis, доступная только для добавления. Она имеет некоторое сходство с другими структурами данных, такими как списки, но более полезна и сложна. Когда мы добавляем данные/сообщения в потоковую переменную, они становятся доступными для потребителей. Благодаря БЛОКИРУЮЩЕМУ API, мы можем заставить потребителей ждать прибытия новых сообщений. Такая структура данных отличается высокой скоростью работы и простотой в реализации. С ее помощью можно также использовать группы потребителей, которые позволяют отправлять различное подмножество сообщений разным потребителям.

Пример использования

Предположим, у нас есть два микросервиса  —  MS-1 и MS-2. Они хотят взаимодействовать между собой или обмениваться данными. При таком сценарии на первый план выходят службы обмена сообщениями. Потоки Redis занимают позицию между MS-1 и MS-2 и выступают в качестве брокера (посредника). Они передают сообщения/данные от MS-1 к MS-2 и наоборот через определенные потоковые переменные в Redis.

Для большей ясности предположим, что MS-1 добавляет некоторые данные в поток, а MS-2 непрерывно его прослушивает. Как только MS-1 добавит данные в этот поток, MS-2 получит их.

Основные операции

Запись в потоке  —  это не просто строка, а строка, состоящая из одной или нескольких пар значений полей. Есть два основных типа операций: добавление данных в потоки и получение данных из потоков.

Добавление данных

Основной командой записи является XADD, которая добавляет новую запись в указанный поток.

> XADD profile-information * name alex age 27 hair black

Sample Output
-------------------------------------------
1627885225420-0

Указанная выше команда добавит новую запись в поток по ключу “Информация профиля” (profile information). Подробно разберем эту команду, чтобы лучше ее понимать.

  • XADD  —  это команда, которая указывает Redis на то, что нужно добавить новую запись по указанному ключу.
  • Информация о профиле (profile-information)  —  это ключевое имя потока.
  • Символ * указывает Redis на создание монотонно увеличивающегося ID для каждой записи. Мы также можем явно указать ID, но это довольно редкая практика. Автоматическая генерация ID сервером идеально подходит практически для всех случаев.
  • Остальная часть команды  —  это пары ключ-значение, составляющие потоковую запись.

Есть также полезная команда, которая может сообщить количество элементов внутри потока.

> XLEN profile-information

Sample Output
-------------------------------------------
(integer) 1
  • Информация о профиле (profile-information)  —  это ключевое имя потока.

Получение данных

Итак, мы научились добавлять данные в поток с помощью команды XADD. Теперь пришло время научиться считывать записи из потока. Существует три основных типа операций для получения данных. Обсудим их по порядку.

Запрос по диапазону: XRANGE и XREVRANGE.

Прежде чем узнать, как работает запрос по диапазону, нужно ближе познакомиться с идентификаторами (ID). Идентификаторы, возвращаемые командой XADD, выглядят следующим образом  —  1627796743670–0 и представлены в формате:

<millisecondsTime>-<sequenceNumber>
(<Время в миллисекундах>-<Порядковый номер>)

Время в миллисекундах  —  это местное время в локальном узле Redis, а порядковый номер используется для дифференциации записей в том случае, если они созданы за одни и те же миллисекунды. Redis обеспечивает расположение этих идентификаторов в монотонно возрастающем порядке.

Первая часть идентификаторов, как правило, представляет собой временную отметку, поэтому она также предлагает запрос по диапазону на основе времени. Чтобы сделать запрос по диапазону, нужно указать два идентификатора  —  начало и конец соответственно. Обратите внимание, что диапазон запроса является включающим. Это означает, что возвращаемый диапазон будет включать элемент, имеющий начало или конец в виде идентификатора.

> XRANGE profile-information - +

Sample Output
-------------------------------------------
1) 1) 1627885225420-0
   2) 1) "name"
      2) "alex"
      3) "age"
      4) "27"
      5) "hair"
      6) "black"
  • XRANGE  —  это команда для запроса на основе диапазона.
  • Информация о профиле (profile-information)  —  это ключевое имя потока.
  • Два специальных идентификатора (   и +) означают наименьший и наибольший идентификатор, доступные в указанном потоке.
  • Мы также можем указать любые допустимые идентификаторы вместо    и +, чтобы обозначить начальную и конечную точку диапазона.

Запрос по временному диапазону.

Мы можем опустить часть, в которой указан порядковый номер, для запроса, основанного на времени, и указать только временные метки для обозначения времени начала и конца. Когда мы опускаем часть с номером, начальная последовательность автоматически устанавливается равной 0, а конечная последовательность будет максимальным доступным порядковым номером.

> XRANGE profile-information 1627796743670 1627799297560

Мы можем указать опцию COUNT в конце запроса. Так мы получим первые N объектов из диапазона.

> XRANGE profile-information - + COUNT 2

XREVRANGE работает точно так же, как XRANGE, но в обратном порядке.

> XREVRANGE profile-information + - COUNT 2

Функция вернет N элементов в обратном порядке диапазона.

Получение новых элементов с помощью XREAD.

Мы можем подписаться на новые элементы, поступающие в поток. Эта концепция похожа на принцип работы Pub/Sub, но тут есть существенные различия. Основное сходство заключается в том, что у потока может быть несколько потребителей, и новый элемент по умолчанию будет доставлен каждому из них, ожидающему сообщений. Это в значительной степени похоже на технологию разветвления, какой и является Pub/Sub.

Основное же отличие заключается в том, что Pub/Sub работает по принципу “нажал и забыл” (после разветвления сообщений они удаляются из Redis), в то время как потоки функционируют совсем по-другому. Они сохраняют сообщения со своими идентификаторами. Помимо обеспечения потоков, группы потребителей предоставляют такой уровень контроля, который не доступен Pub/Sub или списку блокировок.

XREAD  —  это команда для прослушивания новых сообщений, поступающих в поток.

> XREAD COUNT 1 STREAMS profile-information 0
  • COUNT  —  это опциональная команда для ограничения выходных записей. Значение после COUNT показывает, сколько записей мы хотим прочитать из потока.
  • STREAMS  —  обязательное указание, после которого должно идти ключевое имя потока.
  • Последнее значение (0) является идентификатором. Он показывает: максимальный идентификатор уже сохранен в потоке profile information, и команда вернет только те записи, идентификатор которых больше этого. В нашем случае 0 означает следующее: мы хотим, чтобы сообщения из начала потока имели идентификатор больше, чем 0–0.

Мы можем легко превратить XREAD в блокирующую команду.

> XREAD BLOCK 0 STREAMS profile-information $
  • Опция BLOCK со значением таймаута 0 означает, что эта команда проверит, сможет ли она немедленно обработать запрос. Если она сможет обработать запрос, то сделает это. В противном случае она заблокирует его до поступления данных. Таймаут в 0 миллисекунд означает, что в отношении этого запроса никогда не наступит таймаут.
  • $  —  это специальный ID, который представляет максимальный идентификатор, уже сохраненный в потоках, так что мы будем получать только новые сообщения.
  • Мы также указываем опцию COUNT, чтобы ограничить выходные записи. На самом деле, у XREAD есть только две опции  —  BLOCK и COUNT.

Группы потребителей

Мы подошли к самой интересной части статьи: XREAD предлагает преимущества технологии разветвления. Теперь рассмотрим другой случай. Допустим, у нас есть несколько потребителей, и мы хотим распределить подмножество сообщений между ними. Мы не будем передавать одно и то же сообщение разным потребителям. Предположим, у нас есть два потребителя  —  C1 и C2. Допустим, у нас имеется 7 сообщений, идентификаторы которых равны 1, 2, 3, 4, 5, 6, 7. Мы хотим распределить эти сообщения между C1 и C2 следующим образом:

1 -> C1
2 -> C2
3 -> C1
4 -> C2
5 -> C1
6 -> C2
7 -> C1

Возможно, вы заметили, что мы не отправляем одни и те же сообщения нескольким клиентам. Мы посылаем разные сообщения разным клиентам. В этом случае нам помогают группы потребителей.

Создание группы потребителей.

Предположим, у нас уже есть ключ profile-information типа stream. В таком случае выполнение следующей команды приведет к созданию группы потребителей.

> XGROUP CREATE profile-information mygroup $
  • Вы уже знаете, что $  —  это специальный идентификатор, представляющий последний максимальный ID, доступный в потоках. Группа потребителей должна знать, откуда ей следует отслеживать сообщения для своего потребителя. При передаче $ будут отслеживаться только новые сообщения после создания этой группы. Если мы передадим 0 вместо $, то эта группа будет использовать все сообщения с начала потока. Мы можем указать любой действительный идентификатор, обозначающий начальную точку, с которой группа будет получать сообщение.
  • mygroup  —  это название группы.
  • profile-information  —  ключ существующего потока.
  • Если потока нет, XGROUP поможет его создать путем передачи подкоманды в последнем MKSTREAM.
> XGROUP CREATE newstream mygroup $ MKSTREAM

Процесс потребления из группы.

Redis предоставляет почти аналогичную команду под названием XREADGROUP и ту же опцию BLOCK. В противном случае она будет синхронной. Существует обязательная опция GROUP с двумя аргументами  —  именем группы потребителей и именем потребителя, пытающегося считывать информацию.

> XREADGROUP BLOCK 0 GROUP mygroup Alice COUNT 1 STREAMS profile-information >
  • Команда в значительной степени носит описательный характер. Она говорит о том, что нужно прочитать данные из потока profile information, используя группу потребителей mygroup, и я являюсь потребителем по имени Алиса. Каждый потребитель должен указать свое имя, которое будет идентифицировать его внутри группы.
  • >  —  еще один специальный идентификатор, действующий только в контексте групп потребителей. Он означает, что сообщения на данный момент еще не доставлялись другим потребителям.

Заключение 

Это почти вся основная информация о потоках Redis. Концепции и информация взяты из этой статьи, опубликованной на официальном сайте. Вы можете узнать из нее больше информации.

Читайте также:

Читайте нас в TelegramVK и Яндекс.Дзен


Перевод статьи Isabela Imrose: An Introduction to Redis Streams

Предыдущая статьяНовый API форматировщика дат в Swift
Следующая статья9 странностей Python для C++ программистов