Разветвление на различные очереди SQS с помощью фильтрации сообщений SNS

SNS и SQS  —  сервисы AWS, которые часто находят применение в событийно-управляемой архитектуре. Сочетание этих сервисов дает возможность отправлять сообщения по разным каналам. Часто возникает необходимость фильтровать сообщения, прежде чем реплицировать их в несколько очередей. Здесь я намерен объяснить, как достичь этой цели, воспользовавшись всеми преимуществами этих сервисов AWS без кастомного кода.

Начнем с того, что освежим свои знания по SNS-SQS. SNS  —  это сервис AWS, который координирует и управляет доставкой или отправкой сообщений конечным точкам подписки. Он позволяет создать “SNS-тему”, которая будет логической точкой доступа, действующей в качестве канала связи. С помощью клиента командной строки AWS, SDK или веб-консоли в этой теме можно начать публикацию сообщений. Затем тема будет пересылать сообщения своим подписчикам. Одним из возможных подписчиков является SQS  —  сервис массового обслуживания AWS. Вы можете подписать несколько очередей на одну и ту же тему, чтобы реплицировать сообщение по этим очередям. Это и называется разветвлением.

В сценарии, описанном выше, сообщение передается во все очереди, что позволяет выполнять параллельную асинхронную обработку. Но что делать, если нужно реплицировать сообщения только в некоторые очереди, в зависимости от атрибутов сообщений? SNS предлагает отличное решение.

В этом примере мы будем фильтровать по двум атрибутам:

  • Цвет (color).
  • Число (number).

ColorQueue будет получать все сообщения независимо от значения атрибута color. BlueYellowQueue будет получать все сообщения, для которых атрибут color установлен на синий или желтый. RedQueue будет получать только сообщения, у которых в атрибуте color будет значение “красный”. Наконец, есть две очереди, которые могут принимать сообщения, содержащие в качестве атрибута зеленый цвет. От второго атрибута (число) зависит, попадет ли сообщение в GreenHighQueue (number > 100) или GreenLowQueue (number ≤100).

Чтобы облегчить развертывание этой установки, я создал шаблон CloudFormation. Шаблон содержит создание SNS-темы, очередей SQS, также соответствующих очередей недоставленных сообщений. Это все довольно просто. Немного сложнее выглядит политика SQS. Она предоставляет нашей теме SNS разрешение SendMessage для пяти очередей.

QueuePolicy:
    Type: AWS::SQS::QueuePolicy
    Properties:
      Queues:
        - !Ref ColorQueue
        - !Ref BlueYellowQueue
        - !Ref RedQueue
        - !Ref GreenHighQueue
        - !Ref GreenLowQueue
      PolicyDocument:
        Statement:
          - Effect: Allow
            Principal:
              Service: sns.amazonaws.com
            Action:
              - sqs:SendMessage
            Resource: "*"
            Condition:
              ArnEquals: 
                aws:SourceArn:
                  !Ref SNSTopic

Теперь осталось только создать подписку SQS на SNS. На этом месте мы также определяем политику фильтрации.

У ColorQueue нет политики фильтрации, и она просто подписывается на тему.

ColorQueueSubscription:
    Type: AWS::SNS::Subscription
    Properties:
      Endpoint: !GetAtt ColorQueue.Arn
      Protocol: sqs
      RawMessageDelivery: True
      TopicArn: !Ref SNSTopic

Другие подписки содержат политики фильтрации. Вот одна из них для BlueYellowQueue, которая может получать сообщение, где в атрибуте color указан синий или желтый.

BlueYellowQueueSubscription:
    Type: AWS::SNS::Subscription
    Properties:
      Endpoint: !GetAtt BlueYellowQueue.Arn
      Protocol: sqs
      FilterPolicy:
        color:
          - blue
          - yellow
      RawMessageDelivery: True
      TopicArn: !Ref SNSTopic

GreenHighQueue и GreenLowQueue используют две политики фильтрации. Ниже вы можете увидеть ту, которая применяется для GreenHighQueue.

GreenHighQueueSubscription:
    Type: AWS::SNS::Subscription
    Properties:
      Endpoint: !GetAtt GreenHighQueue.Arn
      Protocol: sqs
      FilterPolicy:
        color:
          - green
        number:
        - numeric:
          - ">"
          - 100

Для развертывания полного стека можно воспользоваться командой:

$ aws cloudformation create-stack --template-body file://template.yaml --stack-name sns-sqs-fanout-filter-example

Чтобы проверить, как всё работает, опубликуем сообщение в SNS. Начнем с публикации через веб-консоль AWS.

Сообщение доступно в ColorQueue и RedQueue, как мы и ожидали.

Мы применили RawMessageDelivery, поэтому должны получить точно такое же сообщение в нашей очереди SQS.

Мы также можем проверить атрибут(ы).

Теперь вы можете сделать что-то подобное с помощью AWS CLI:

$ aws sns publish \
--topic-arn "arn:aws:sns:eu-west-1:123456789012:Topic" \
--message "\"body\": \"{this is a test}\"" \
--message-attributes '{ "color":{ "DataType":"String","StringValue":"green" }, "number":{ "DataType":"Number","StringValue":"100"} }'

И снова получаем ожидаемый результат. Сообщение появляется в ColorQueue и GreenLowQueue, поскольку атрибут number меньше или равен 100.

Для последнего примера я воспользовался Python SDK.

import json
import boto3
client = boto3.client('sns')

mesg = json.dumps({
    "default": "default",
    "body": "this is a test"
})

response = client.publish(
    TopicArn='arn:aws:sns:eu-west-1:123456789012:Topic',
    Message=mesg,
    MessageStructure='json',
    MessageAttributes={
        'color': {
            'DataType': 'String',
            'StringValue': 'blue',
        }
    }
)

И снова видим ожидаемый результат:

Этот базовый пример должен был прояснить, как работает фильтрация SNS. Мы видели пример логики “И” (например, color == зеленый и number > 100) и логики “ИЛИ” (например, color == синий или color == желтый). Теперь мы можем перейти к более сложным примерам фильтрации.

Для строк вы можете настроить точное совпадение, как в примере ниже:

FilterPolicy:
  color:
    - green

Можно сделать прямо противоположное, что будет означать: ваша политика соответствует любому атрибуту сообщения, который не включает в себя ни одно из значений атрибута, указанного в политике.

FilterPolicy:
  color:
    - anything-but:
      - green

Вы также можете просто проверить, в наличии ли определенный атрибут, не зная его действительного значения.

FilterPolicy:
  color:
    - exists: true

Также можно проверить префикс значения атрибута.

FilterPolicy:
  color:
    - prefix: bas

Для числовых атрибутов можно выполнять фильтрацию по диапазону. Следующий пример соответствует всем числам от 0 до 100:

FilterPolicy:
  number:
    - numeric:
      - ">"
      - 0
      - "<="
      - 100

Если вы хотите обновить политики фильтрации в консоли AWS, нужно воспользоваться нотацией JSON. Также важно отметить, что обновления политики фильтрации не осуществляются сразу. Для полного вступления в силу дополнений или изменений требуется до 15 минут.

Используя политики фильтрации SNS, вы можете легко фильтровать сообщения по определенным атрибутам. Нет необходимости писать дополнительный код для настройки своих сценарий ветвления.

Мы рассмотрели базовый пример и настройку CloudFormation. После чего продемонстрировали несколько примеров более продвинутой фильтрации.

Надеюсь, вам понравилось. Спасибо за чтение!

Читайте также:

Читайте нас в Telegram, VK и Яндекс.Дзен


Перевод статьи: Lorenz Vanthillo, “How to Fan-Out to Different SQS Queues Using SNS Message Filtering”

Предыдущая статьяКак найти выход из лабиринта с помощью Python
Следующая статьяПочему вам стоит завести чек-листы для программирования