Во второй части проверим наличие корректных данных в таблице Cassandra и коллекции MongoDB. Если они имеются, отправим письмо на электронную почту с информацией об одноразовом пароле OTP, а также сообщение в Slack с информацией об адресе почты и этом пароле в теле сообщения. Часть со Slack нужна для информирования людей на канале Slack, с электронной почтой — для указания соответственного адреса почты.
В конце создадим Airflow DAG со всеми необходимыми задачами.
Проверка данных Cassandra
Проверим наличие конкретного адреса почты в таблице Cassandra:
class CassandraConnector:
def __init__(self, contact_points):
self.cluster = Cluster(contact_points)
self.session = self.cluster.connect()
def select_data(self, email):
query = "SELECT * FROM email_namespace.email_table WHERE email = %s"
result = self.session.execute(query, (email,))
data_dict = {}
for row in result:
data_dict['email'] = row.email
data_dict['otp'] = row.otp
logger.info(f"Email: {row.email}, OTP: {row.otp}")
if len(data_dict) == 0:
data_dict['email'] = ''
data_dict['otp'] = ''
return data_dict
def close(self):
self.cluster.shutdown()
Подключаемся к Cassandra и выбираем данные из соответственной таблицы. Если они там имеются, создаем словарь с этими данными, если нет — возвращаем пустой словарь. Создание словаря, даже пустого, необходимо: если он пуст, задача EmailOperator не выполнится. Так проще определить наличие корректных данных, результат этого скрипта понадобится для EmailOperator.
Используем функцию ниже в качестве задачи Airflow. Определяем конкретный адрес почты и проверяем наличие данных:
def check_cassandra_main():
cassandra_connector = CassandraConnector(['cassandra'])
sample_email = 'sample_email@my_email.com'
data_dict = cassandra_connector.select_data(sample_email)
cassandra_connector.close()
logger.info(f"Data found for email: {data_dict['email']}")
logger.info(f"OTP: {data_dict['otp']}")
Проверка данных MongoDB
Проверим наличие конкретного адреса почты в коллекции MongoDB:
def check_mongodb_main():
mongodb_uri = 'mongodb://root:root@mongo:27017/'
database_name = 'email_database'
collection_name = 'email_collection'
client = MongoClient(mongodb_uri)
db = client[database_name]
collection = db[collection_name]
sample_email = 'sample_email@my_email.com'
result = collection.find_one({'email': sample_email})
data_dict = {}
if result:
logger.info(f"Data found for email: {result['email']}")
logger.info(f"OTP: {result['otp']}")
data_dict['email'] = result.get('email')
data_dict['otp'] = result.get('otp')
client.close()
else:
data_dict['email'] = ''
data_dict['otp'] = ''
client.close()
return data_dict
Подключаемся к MongoDB и выбираем данные из соответственной коллекции. Если они там имеются, создаем словарь с этими данными, если нет — возвращаем пустой словарь.
Создание словаря, даже пустого, снова необходимо: если он пуст, задача EmailOperator не выполнится. Опять же, так проще определить наличие корректных данных, а результат скрипта понадобится для EmailOperator.
Используем функцию выше в качестве задачи Airflow. Определяем конкретный адрес почты и проверяем наличие данных.
mongodb_uri определяется в зависимости от конфигурации в контейнере Docker.
EmailOperator
Теперь подготовимся к созданию задачи с EmailOperator, не разделяя на Cassandra и MongoDB: подготовка у них одинаковая.
Через Gmail SMTP-сервер отправим письмо. Вот добавленные в файл docker-compose в первой части параметры:
AIRFLOW__SMTP__SMTP_HOST: 'smtp.gmail.com'
AIRFLOW__SMTP__SMTP_MAIL_FROM: 'sample_email@my_email.com'
AIRFLOW__SMTP__SMTP_USER: 'sample_email@my_email.com'
AIRFLOW__SMTP__SMTP_PASSWORD: 'your_password'
AIRFLOW__SMTP__SMTP_PORT: '587'
Сначала заменяем sample_email@my_email.com на адрес, с которого будут отправляться письма. Перейдя по ссылке, создаем новый пароль для приложения отправки писем и меняем на него часть с your_password.
Теперь запускаем файл docker-compose, так в итоге будут отправляться письма. Задачу EmailOperator создадим вместе с Airflow DAG ниже.
По завершении получим такое письмо:
SlackWebhookOperator
Отправим сообщение в Slack с информацией об адресе почты и одноразовом пароле OTP в теле сообщения, не разделяя на Cassandra и MongoDB: они очень похожи.
Перейдя по ссылке, создаем токен вебхука Slack:
- Сначала нажимаем кнопку Create App («Создать приложение»).
- Перенаправляемся на страницу учетной записи, на которую отправится сообщение.
- После авторизации создаем специальную рабочую область, а внутри нее еще и специальный канал — для просмотра входящих сообщений.
- Вместе с вебхуком — для отправки сообщений на специальный канал — создаем бот Airflow Slack Webhook.
- Вот как выглядит созданный токен: https://hooks.slack.com/services/T…
Чтобы отправлять сообщения в Slack, переходим в Airflow UI с именем пользователя airflow и паролем airflow. Затем в Admin -> Connections создаем соединение Slack, определяя параметры:
- Идентификатор соединения с любым названием.
- Тип соединения: входящий вебхук Slack.
- Конечная точка вебхука Slack: https://hooks.slack.com/services/.
- Токен вебхука: получается из токена вебхука, созданного выше. Берем его часть после конечной точки, обычно она начинается с «T»:
После того как все определили, создаем соединение. Далее в SlackWebhookOperator из соединений берется токен, с помощью которого сообщения отправляются в специальные рабочую область и канал.
По завершении получим такое сообщение Slack:
Airflow DAG
Объединим все части проекта. Сначала импортируем операторы:
- DummyOperator;
- PythonOperator;
- BranchPythonOperator;
- EmailOperator;
- SlackWebhookOperator.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.email import EmailOperator
from airflow.operators.python import BranchPythonOperator
from airflow.operators.python_operator import PythonOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
Чтобы напрямую импортировать необходимые методы, помещаем все скрипты в каталог dags:
from check_mongodb import check_mongodb_main
from kafka_producer import kafka_producer_main
from check_cassandra import check_cassandra_main
from kafka_create_topic import kafka_create_topic_main
from kafka_consumer_mongodb import kafka_consumer_mongodb_main
from kafka_consumer_cassandra import kafka_consumer_cassandra_main
Затем определяем для dag аргументы по умолчанию:
start_date = datetime(2022, 10, 19, 12, 20)
default_args = {
'owner': 'airflow',
'start_date': start_date,
'retries': 1,
'retry_delay': timedelta(seconds=5)
}
Далее определяем данные почты и одноразового пароля OTP из скриптов проверки данных для EmailOperator и SlackWebhookoperator.
Создаем новую функцию decide_branch
для PythonBranchOperator в зависимости от создаваемого результата и подключаем к двум отдельным фиктивным операторам:
email_cassandra = check_cassandra_main()['email']
otp_cassandra = check_cassandra_main()['otp']
email_mongodb = check_mongodb_main()['email']
otp_mongodb = check_mongodb_main()['otp']
def decide_branch():
create_topic = kafka_create_topic_main()
if create_topic == "Created":
return "topic_created"
else:
return "topic_already_exists"
Теперь создадим DAG из трех основных частей.
Первая часть:
- Создание новой темы Kafka.
- В зависимости от результата создания задачи Kafka добавляем два фиктивных оператора.
- Независимо от результатов фиктивных операторов начинаем отправлять сообщения в тему Kafka.
Вторая часть:
- Получаемые в теме Kafka сообщения вставляем в таблицу Cassandra.
- Проверяем данные в таблице и соответственным образом создаем словарь. Если данные в таблице имеются, в словаре содержится информация. Если нет, он пустой.
- В зависимости от результата задачи проверки данных на специальный адрес почты отправляется письмо.
- После отправляется сообщение в Slack на специальный канал с информацией из таблицы Cassandra.
Третья часть:
- Получаемые в теме Kafka сообщения вставляем в коллекцию MongoDB.
- Проверяем данные в коллекции и соответственным образом создаем словарь. Если данные в коллекции имеются, в словаре содержится информация. Если нет, он пустой.
- В зависимости от результата задачи проверки данных на специальный адрес почты отправляется письмо.
- После отправляется сообщение в Slack на специальный канал с информацией из коллекции MongoDB.
Все части запускаются параллельно:
with DAG('airflow_kafka_cassandra_mongodb', default_args=default_args, schedule_interval='@daily', catchup=False) as dag:
create_new_topic = BranchPythonOperator(task_id='create_new_topic', python_callable=decide_branch)
kafka_consumer_cassandra = PythonOperator(task_id='kafka_consumer_cassandra', python_callable=kafka_consumer_cassandra_main,
retries=2, retry_delay=timedelta(seconds=10),
execution_timeout=timedelta(seconds=45))
kafka_consumer_mongodb = PythonOperator(task_id='kafka_consumer_mongodb', python_callable=kafka_consumer_mongodb_main,
retries=2, retry_delay=timedelta(seconds=10),
execution_timeout=timedelta(seconds=45))
kafka_producer = PythonOperator(task_id='kafka_producer', python_callable=kafka_producer_main,
retries=2, retry_delay=timedelta(seconds=10),
execution_timeout=timedelta(seconds=45))
check_cassandra = PythonOperator(task_id='check_cassandra', python_callable=check_cassandra_main,
retries=2, retry_delay=timedelta(seconds=10),
execution_timeout=timedelta(seconds=45))
check_mongodb = PythonOperator(task_id='check_mongodb', python_callable=check_mongodb_main,
retries=2, retry_delay=timedelta(seconds=10),
execution_timeout=timedelta(seconds=45))
topic_created = DummyOperator(task_id="topic_created")
topic_already_exists = DummyOperator(task_id="topic_already_exists")
send_email_cassandra = EmailOperator(
task_id='send_email_cassandra',
to=email_cassandra,
subject='One-Time Password',
html_content=f"""
<html>
<body>
<h1>Your OTP</h1>
<p>{otp_cassandra}</p>
</body>
</html>
"""
)
send_email_mongodb = EmailOperator(
task_id='send_email_mongodb',
to=email_mongodb,
subject='One-Time Password',
html_content=f"""
<html>
<body>
<h1>Your OTP</h1>
<p>{otp_mongodb}</p>
</body>
</html>
"""
)
send_slack_cassandra = SlackWebhookOperator(
task_id='send_slack_cassandra',
slack_webhook_conn_id = 'slack_webhook',
message=f"""
:red_circle: New e-mail and OTP arrival
:email: -> {email_cassandra}
:ninja: -> {otp_cassandra}
""",
channel='#data-engineering',
username='airflow'
)
send_slack_mongodb = SlackWebhookOperator(
task_id='send_slack_mongodb',
slack_webhook_conn_id = 'slack_webhook',
message=f"""
:red_circle: New e-mail and OTP arrival
:email: -> {email_mongodb}
:ninja: -> {otp_mongodb}
""",
channel='#data-engineering',
username='airflow'
)
create_new_topic >> [topic_created, topic_already_exists] >> kafka_producer
kafka_consumer_cassandra >> check_cassandra >> send_email_cassandra >> send_slack_cassandra
kafka_consumer_mongodb >> check_mongodb >> send_email_mongodb >> send_slack_mongodb
DAG запускается ежедневно, но schedule_interval настраивается под конкретный вариант применения.
- to: адрес почты, получаемый из нереляционной БД. На него отправляется письмо с адреса, определенного в файле docker-compose.
- subject: тема письма.
- html_content: тело сообщения, включаемого в письмо.
- slack_webhook_conn_id: идентификатор соединения, определенный при создании нового соединения Slack.
- message: отправляемое в Slack сообщение. В него включаются эмодзи или сообщение определяется как тело с отдельными разделами в самом сообщении.
- channel: определяется канал, на который сообщение отправляется.
- username: сообщение отправляется с именем созданного бота. Без бота этот параметр тоже используется.
Вот как выглядит готовый DAG в Airflow UI:
Читайте также:
- Проект инженерии данных «от и до» с Apache Airflow, Postgres и GCP
- Инженерия данных: руководство для начинающих, вдохновленное Формулой-1
- Через Apache Brooklyn к автономным вычислениям
Читайте нас в Telegram, VK и Дзен
Перевод статьи Dogukan Ulu: Data Engineering End-to-End Project — Part 2 — Airflow, Kafka, Cassandra, MongoDB, Docker, EmailOperator, SlackWebhookOperator