Проект инженерии данных с DAG Airflow «от и до». Часть 2

Во второй части проверим наличие корректных данных в таблице Cassandra и коллекции MongoDB. Если они имеются, отправим письмо на электронную почту с информацией об одноразовом пароле OTP, а также сообщение в Slack с информацией об адресе почты и этом пароле в теле сообщения. Часть со Slack нужна для информирования людей на канале Slack, с электронной почтой  —  для указания соответственного адреса почты.

В конце создадим Airflow DAG со всеми необходимыми задачами.

Проверка данных Cassandra

Проверим наличие конкретного адреса почты в таблице Cassandra:

class CassandraConnector:
def __init__(self, contact_points):
self.cluster = Cluster(contact_points)
self.session = self.cluster.connect()
def select_data(self, email):
query = "SELECT * FROM email_namespace.email_table WHERE email = %s"
result = self.session.execute(query, (email,))

data_dict = {}

for row in result:
data_dict['email'] = row.email
data_dict['otp'] = row.otp
logger.info(f"Email: {row.email}, OTP: {row.otp}")

if len(data_dict) == 0:
data_dict['email'] = ''
data_dict['otp'] = ''

return data_dict
def close(self):
self.cluster.shutdown()

Подключаемся к Cassandra и выбираем данные из соответственной таблицы. Если они там имеются, создаем словарь с этими данными, если нет  —  возвращаем пустой словарь. Создание словаря, даже пустого, необходимо: если он пуст, задача EmailOperator не выполнится. Так проще определить наличие корректных данных, результат этого скрипта понадобится для EmailOperator.

Используем функцию ниже в качестве задачи Airflow. Определяем конкретный адрес почты и проверяем наличие данных:

def check_cassandra_main():
cassandra_connector = CassandraConnector(['cassandra'])
sample_email = 'sample_email@my_email.com'
data_dict = cassandra_connector.select_data(sample_email)
cassandra_connector.close()
logger.info(f"Data found for email: {data_dict['email']}")
logger.info(f"OTP: {data_dict['otp']}")

Проверка данных MongoDB

Проверим наличие конкретного адреса почты в коллекции MongoDB:

def check_mongodb_main():
mongodb_uri = 'mongodb://root:root@mongo:27017/'
database_name = 'email_database'
collection_name = 'email_collection'
client = MongoClient(mongodb_uri)
db = client[database_name]
collection = db[collection_name]
sample_email = 'sample_email@my_email.com'
result = collection.find_one({'email': sample_email})
data_dict = {}
if result:
logger.info(f"Data found for email: {result['email']}")
logger.info(f"OTP: {result['otp']}")

data_dict['email'] = result.get('email')
data_dict['otp'] = result.get('otp')

client.close()
else:
data_dict['email'] = ''
data_dict['otp'] = ''
client.close()
return data_dict

Подключаемся к MongoDB и выбираем данные из соответственной коллекции. Если они там имеются, создаем словарь с этими данными, если нет  —  возвращаем пустой словарь.

Создание словаря, даже пустого, снова необходимо: если он пуст, задача EmailOperator не выполнится. Опять же, так проще определить наличие корректных данных, а результат скрипта понадобится для EmailOperator.

Используем функцию выше в качестве задачи Airflow. Определяем конкретный адрес почты и проверяем наличие данных.

mongodb_uri определяется в зависимости от конфигурации в контейнере Docker.

EmailOperator

Теперь подготовимся к созданию задачи с EmailOperator, не разделяя на Cassandra и MongoDB: подготовка у них одинаковая.

Через Gmail SMTP-сервер отправим письмо. Вот добавленные в файл docker-compose в первой части параметры:

AIRFLOW__SMTP__SMTP_HOST: 'smtp.gmail.com'
AIRFLOW__SMTP__SMTP_MAIL_FROM: 'sample_email@my_email.com'
AIRFLOW__SMTP__SMTP_USER: 'sample_email@my_email.com'
AIRFLOW__SMTP__SMTP_PASSWORD: 'your_password'
AIRFLOW__SMTP__SMTP_PORT: '587'

Сначала заменяем sample_email@my_email.com на адрес, с которого будут отправляться письма. Перейдя по ссылке, создаем новый пароль для приложения отправки писем и меняем на него часть с your_password.

Теперь запускаем файл docker-compose, так в итоге будут отправляться письма. Задачу EmailOperator создадим вместе с Airflow DAG ниже.

По завершении получим такое письмо:

SlackWebhookOperator

Отправим сообщение в Slack с информацией об адресе почты и одноразовом пароле OTP в теле сообщения, не разделяя на Cassandra и MongoDB: они очень похожи.

Перейдя по ссылке, создаем токен вебхука Slack:

  • Сначала нажимаем кнопку Create App («Создать приложение»).
  • Перенаправляемся на страницу учетной записи, на которую отправится сообщение.
  • После авторизации создаем специальную рабочую область, а внутри нее еще и специальный канал  —  для просмотра входящих сообщений.
  • Вместе с вебхуком  —  для отправки сообщений на специальный канал  —  создаем бот Airflow Slack Webhook.
  • Вот как выглядит созданный токен: https://hooks.slack.com/services/T…

Чтобы отправлять сообщения в Slack, переходим в Airflow UI с именем пользователя airflow и паролем airflow. Затем в Admin -> Connections создаем соединение Slack, определяя параметры:

  • Идентификатор соединения с любым названием.
  • Тип соединения: входящий вебхук Slack.
  • Конечная точка вебхука Slack: https://hooks.slack.com/services/.
  • Токен вебхука: получается из токена вебхука, созданного выше. Берем его часть после конечной точки, обычно она начинается с «T»:

После того как все определили, создаем соединение. Далее в SlackWebhookOperator из соединений берется токен, с помощью которого сообщения отправляются в специальные рабочую область и канал.

По завершении получим такое сообщение Slack:

Airflow DAG

Объединим все части проекта. Сначала импортируем операторы:

  • DummyOperator;
  • PythonOperator;
  • BranchPythonOperator;
  • EmailOperator;
  • SlackWebhookOperator.
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.email import EmailOperator
from airflow.operators.python import BranchPythonOperator
from airflow.operators.python_operator import PythonOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator

Чтобы напрямую импортировать необходимые методы, помещаем все скрипты в каталог dags:

from check_mongodb import check_mongodb_main
from kafka_producer import kafka_producer_main
from check_cassandra import check_cassandra_main
from kafka_create_topic import kafka_create_topic_main
from kafka_consumer_mongodb import kafka_consumer_mongodb_main
from kafka_consumer_cassandra import kafka_consumer_cassandra_main

Затем определяем для dag аргументы по умолчанию:

start_date = datetime(2022, 10, 19, 12, 20)

default_args = {
'owner': 'airflow',
'start_date': start_date,
'retries': 1,
'retry_delay': timedelta(seconds=5)
}

Далее определяем данные почты и одноразового пароля OTP из скриптов проверки данных для EmailOperator и SlackWebhookoperator.

Создаем новую функцию decide_branch для PythonBranchOperator в зависимости от создаваемого результата и подключаем к двум отдельным фиктивным операторам:

email_cassandra = check_cassandra_main()['email']
otp_cassandra = check_cassandra_main()['otp']
email_mongodb = check_mongodb_main()['email']
otp_mongodb = check_mongodb_main()['otp']

def decide_branch():
create_topic = kafka_create_topic_main()
if create_topic == "Created":
return "topic_created"
else:
return "topic_already_exists"

Теперь создадим DAG из трех основных частей.

Первая часть:

  • Создание новой темы Kafka.
  • В зависимости от результата создания задачи Kafka добавляем два фиктивных оператора.
  • Независимо от результатов фиктивных операторов начинаем отправлять сообщения в тему Kafka.

Вторая часть:

  • Получаемые в теме Kafka сообщения вставляем в таблицу Cassandra.
  • Проверяем данные в таблице и соответственным образом создаем словарь. Если данные в таблице имеются, в словаре содержится информация. Если нет, он пустой.
  • В зависимости от результата задачи проверки данных на специальный адрес почты отправляется письмо.
  • После отправляется сообщение в Slack на специальный канал с информацией из таблицы Cassandra.

Третья часть:

  • Получаемые в теме Kafka сообщения вставляем в коллекцию MongoDB.
  • Проверяем данные в коллекции и соответственным образом создаем словарь. Если данные в коллекции имеются, в словаре содержится информация. Если нет, он пустой.
  • В зависимости от результата задачи проверки данных на специальный адрес почты отправляется письмо.
  • После отправляется сообщение в Slack на специальный канал с информацией из коллекции MongoDB.

Все части запускаются параллельно:

with DAG('airflow_kafka_cassandra_mongodb', default_args=default_args, schedule_interval='@daily', catchup=False) as dag:

create_new_topic = BranchPythonOperator(task_id='create_new_topic', python_callable=decide_branch)

kafka_consumer_cassandra = PythonOperator(task_id='kafka_consumer_cassandra', python_callable=kafka_consumer_cassandra_main,
retries=2, retry_delay=timedelta(seconds=10),
execution_timeout=timedelta(seconds=45))

kafka_consumer_mongodb = PythonOperator(task_id='kafka_consumer_mongodb', python_callable=kafka_consumer_mongodb_main,
retries=2, retry_delay=timedelta(seconds=10),
execution_timeout=timedelta(seconds=45))

kafka_producer = PythonOperator(task_id='kafka_producer', python_callable=kafka_producer_main,
retries=2, retry_delay=timedelta(seconds=10),
execution_timeout=timedelta(seconds=45))

check_cassandra = PythonOperator(task_id='check_cassandra', python_callable=check_cassandra_main,
retries=2, retry_delay=timedelta(seconds=10),
execution_timeout=timedelta(seconds=45))

check_mongodb = PythonOperator(task_id='check_mongodb', python_callable=check_mongodb_main,
retries=2, retry_delay=timedelta(seconds=10),
execution_timeout=timedelta(seconds=45))

topic_created = DummyOperator(task_id="topic_created")

topic_already_exists = DummyOperator(task_id="topic_already_exists")

send_email_cassandra = EmailOperator(
task_id='send_email_cassandra',
to=email_cassandra,
subject='One-Time Password',
html_content=f"""
<html>
<body>
<h1>Your OTP</h1>
<p>{otp_cassandra}</p>
</body>
</html>
"""
)

send_email_mongodb = EmailOperator(
task_id='send_email_mongodb',
to=email_mongodb,
subject='One-Time Password',
html_content=f"""
<html>
<body>
<h1>Your OTP</h1>
<p>{otp_mongodb}</p>
</body>
</html>
"""
)

send_slack_cassandra = SlackWebhookOperator(
task_id='send_slack_cassandra',
slack_webhook_conn_id = 'slack_webhook',
message=f"""
:red_circle: New e-mail and OTP arrival
:email: -> {email_cassandra}
:ninja: -> {otp_cassandra}
""",
channel='#data-engineering',
username='airflow'
)

send_slack_mongodb = SlackWebhookOperator(
task_id='send_slack_mongodb',
slack_webhook_conn_id = 'slack_webhook',
message=f"""
:red_circle: New e-mail and OTP arrival
:email: -> {email_mongodb}
:ninja: -> {otp_mongodb}
""",
channel='#data-engineering',
username='airflow'
)

create_new_topic >> [topic_created, topic_already_exists] >> kafka_producer
kafka_consumer_cassandra >> check_cassandra >> send_email_cassandra >> send_slack_cassandra
kafka_consumer_mongodb >> check_mongodb >> send_email_mongodb >> send_slack_mongodb

DAG запускается ежедневно, но schedule_interval настраивается под конкретный вариант применения.

EmailOperator:

  • to: адрес почты, получаемый из нереляционной БД. На него отправляется письмо с адреса, определенного в файле docker-compose.
  • subject: тема письма.
  • html_content: тело сообщения, включаемого в письмо.

SlackWebhookOperator:

  • slack_webhook_conn_id: идентификатор соединения, определенный при создании нового соединения Slack.
  • message: отправляемое в Slack сообщение. В него включаются эмодзи или сообщение определяется как тело с отдельными разделами в самом сообщении.
  • channel: определяется канал, на который сообщение отправляется.
  • username: сообщение отправляется с именем созданного бота. Без бота этот параметр тоже используется.

Вот как выглядит готовый DAG в Airflow UI:

Читайте также:

Читайте нас в Telegram, VK и Дзен


Перевод статьи Dogukan Ulu: Data Engineering End-to-End Project — Part 2 — Airflow, Kafka, Cassandra, MongoDB, Docker, EmailOperator, SlackWebhookOperator

Предыдущая статьяИнженерия данных: руководство для начинающих, вдохновленное Формулой-1
Следующая статьяСостояние гонки в Node.js: практическое руководство