SNS и SQS — сервисы AWS, которые часто находят применение в событийно-управляемой архитектуре. Сочетание этих сервисов дает возможность отправлять сообщения по разным каналам. Часто возникает необходимость фильтровать сообщения, прежде чем реплицировать их в несколько очередей. Здесь я намерен объяснить, как достичь этой цели, воспользовавшись всеми преимуществами этих сервисов AWS без кастомного кода.
Начнем с того, что освежим свои знания по SNS-SQS. SNS — это сервис AWS, который координирует и управляет доставкой или отправкой сообщений конечным точкам подписки. Он позволяет создать “SNS-тему”, которая будет логической точкой доступа, действующей в качестве канала связи. С помощью клиента командной строки AWS, SDK или веб-консоли в этой теме можно начать публикацию сообщений. Затем тема будет пересылать сообщения своим подписчикам. Одним из возможных подписчиков является SQS — сервис массового обслуживания AWS. Вы можете подписать несколько очередей на одну и ту же тему, чтобы реплицировать сообщение по этим очередям. Это и называется разветвлением.
В сценарии, описанном выше, сообщение передается во все очереди, что позволяет выполнять параллельную асинхронную обработку. Но что делать, если нужно реплицировать сообщения только в некоторые очереди, в зависимости от атрибутов сообщений? SNS предлагает отличное решение.
В этом примере мы будем фильтровать по двум атрибутам:
- Цвет (color).
- Число (number).
ColorQueue
будет получать все сообщения независимо от значения атрибута color
. BlueYellowQueue
будет получать все сообщения, для которых атрибут color
установлен на синий или желтый. RedQueue
будет получать только сообщения, у которых в атрибуте color
будет значение “красный”. Наконец, есть две очереди, которые могут принимать сообщения, содержащие в качестве атрибута зеленый цвет. От второго атрибута (число) зависит, попадет ли сообщение в GreenHighQueue
(number
> 100) или GreenLowQueue
(number
≤100).

Чтобы облегчить развертывание этой установки, я создал шаблон CloudFormation
. Шаблон содержит создание SNS-темы, очередей SQS, также соответствующих очередей недоставленных сообщений. Это все довольно просто. Немного сложнее выглядит политика SQS. Она предоставляет нашей теме SNS разрешение SendMessage
для пяти очередей.
QueuePolicy:
Type: AWS::SQS::QueuePolicy
Properties:
Queues:
- !Ref ColorQueue
- !Ref BlueYellowQueue
- !Ref RedQueue
- !Ref GreenHighQueue
- !Ref GreenLowQueue
PolicyDocument:
Statement:
- Effect: Allow
Principal:
Service: sns.amazonaws.com
Action:
- sqs:SendMessage
Resource: "*"
Condition:
ArnEquals:
aws:SourceArn:
!Ref SNSTopic
Теперь осталось только создать подписку SQS на SNS. На этом месте мы также определяем политику фильтрации.
У ColorQueue
нет политики фильтрации, и она просто подписывается на тему.
ColorQueueSubscription:
Type: AWS::SNS::Subscription
Properties:
Endpoint: !GetAtt ColorQueue.Arn
Protocol: sqs
RawMessageDelivery: True
TopicArn: !Ref SNSTopic
Другие подписки содержат политики фильтрации. Вот одна из них для BlueYellowQueue
, которая может получать сообщение, где в атрибуте color
указан синий или желтый.
BlueYellowQueueSubscription:
Type: AWS::SNS::Subscription
Properties:
Endpoint: !GetAtt BlueYellowQueue.Arn
Protocol: sqs
FilterPolicy:
color:
- blue
- yellow
RawMessageDelivery: True
TopicArn: !Ref SNSTopic
GreenHighQueue
и GreenLowQueue
используют две политики фильтрации. Ниже вы можете увидеть ту, которая применяется для GreenHighQueue
.
GreenHighQueueSubscription:
Type: AWS::SNS::Subscription
Properties:
Endpoint: !GetAtt GreenHighQueue.Arn
Protocol: sqs
FilterPolicy:
color:
- green
number:
- numeric:
- ">"
- 100
Для развертывания полного стека можно воспользоваться командой:
$ aws cloudformation create-stack --template-body file://template.yaml --stack-name sns-sqs-fanout-filter-example
Чтобы проверить, как всё работает, опубликуем сообщение в SNS. Начнем с публикации через веб-консоль AWS.

Сообщение доступно в ColorQueue
и RedQueue
, как мы и ожидали.

Мы применили RawMessageDelivery
, поэтому должны получить точно такое же сообщение в нашей очереди SQS.

Мы также можем проверить атрибут(ы).

Теперь вы можете сделать что-то подобное с помощью AWS CLI:
$ aws sns publish \
--topic-arn "arn:aws:sns:eu-west-1:123456789012:Topic" \
--message "\"body\": \"{this is a test}\"" \
--message-attributes '{ "color":{ "DataType":"String","StringValue":"green" }, "number":{ "DataType":"Number","StringValue":"100"} }'
И снова получаем ожидаемый результат. Сообщение появляется в ColorQueue
и GreenLowQueue
, поскольку атрибут number
меньше или равен 100.

Для последнего примера я воспользовался Python SDK.
import json
import boto3
client = boto3.client('sns')
mesg = json.dumps({
"default": "default",
"body": "this is a test"
})
response = client.publish(
TopicArn='arn:aws:sns:eu-west-1:123456789012:Topic',
Message=mesg,
MessageStructure='json',
MessageAttributes={
'color': {
'DataType': 'String',
'StringValue': 'blue',
}
}
)
И снова видим ожидаемый результат:

Этот базовый пример должен был прояснить, как работает фильтрация SNS. Мы видели пример логики “И” (например, color
== зеленый и number
> 100) и логики “ИЛИ” (например, color
== синий или color
== желтый). Теперь мы можем перейти к более сложным примерам фильтрации.
Для строк вы можете настроить точное совпадение, как в примере ниже:
FilterPolicy:
color:
- green
Можно сделать прямо противоположное, что будет означать: ваша политика соответствует любому атрибуту сообщения, который не включает в себя ни одно из значений атрибута, указанного в политике.
FilterPolicy:
color:
- anything-but:
- green
Вы также можете просто проверить, в наличии ли определенный атрибут, не зная его действительного значения.
FilterPolicy:
color:
- exists: true
Также можно проверить префикс значения атрибута.
FilterPolicy:
color:
- prefix: bas
Для числовых атрибутов можно выполнять фильтрацию по диапазону. Следующий пример соответствует всем числам от 0 до 100:
FilterPolicy:
number:
- numeric:
- ">"
- 0
- "<="
- 100
Если вы хотите обновить политики фильтрации в консоли AWS, нужно воспользоваться нотацией JSON. Также важно отметить, что обновления политики фильтрации не осуществляются сразу. Для полного вступления в силу дополнений или изменений требуется до 15 минут.

Используя политики фильтрации SNS, вы можете легко фильтровать сообщения по определенным атрибутам. Нет необходимости писать дополнительный код для настройки своих сценарий ветвления.
Мы рассмотрели базовый пример и настройку CloudFormation
. После чего продемонстрировали несколько примеров более продвинутой фильтрации.
Надеюсь, вам понравилось. Спасибо за чтение!
Читайте также:
- Как создать бессерверное приложение с помощью AWS Chalice
- LocalStack: запуск AWS на локальном компьютере
- Архитектура ПО: создайте свое приложение с AWS
Читайте нас в Telegram, VK и Яндекс.Дзен
Перевод статьи: Lorenz Vanthillo, “How to Fan-Out to Different SQS Queues Using SNS Message Filtering”