Это, конечно, самый продолжительный мой проект (и самый дорогой). Тем не менее он замечателен, поэтому не могу не поделиться им с вами.

Постараюсь упростить свой рассказ, чтобы даже люди, не связанные с технологиями, смогли во всем разобраться.

Почему?

У всего есть причина, и у этого проекта тоже.

Недавно я приобрел навыки системной инженерии, и решил применить их в сквозном проекте. Конечно, этот проект не самый лучший, но он помогает мне быстро выполнять итерации и избегать ошибок (а также отвечает реальности современной инженерии данных, повсюду заявляющей о себе впечатляющими иконками инструментов).

Конечная цель

Конечной целью данного проекта является создание полнофункциональной платформы/конвейера обработки и анализа данных, которая будет ежедневно обновлять аналитические таблицы/дэшборды.

Инфраструктура

Как видно на изображении, для запуска проекта нужно было принять несколько решений:

  • Откуда брать данные? RDS PostgreSQL + Python-скрипт в экземпляре EC2 для генерации новых данных каждые 2 часа.
  • Где будут храниться данные для аналитики? Snowflake.
  • Как связать эти два компонента? Airbyte (Cloud).
  • Как сделать данные пригодными для аналитики/дэшбординга? DBT + Airflow.

Рассмотрим каждое решение отдельно.

Откуда брать данные?

Я очень устал от плоских файлов, поэтому подумал, что было бы неплохо создать скрипт, который бы генерировал фиктивные данные по расписанию (каждые 2 часа) и хранил бы их в базе данных Postgres (RDS).

Не буду углубляться в эту часть, поскольку она не является нашей основной целью.

Итак, вот краткий обзор модели данных исходной системы:

Эта модель данных намеренно базовая и “грязная”. Я хотел получить что-то “сырое”, чтобы было над чем поработать на этапе трансформации.

Когда у нас были готовы модель данных и Python-скрипт, мы перешли к созданию реляционной базы данных PostgreSQL, в которой хранились бы необработанные таблицы.

Воспользовавшись Terraform, мы создали:

  • VPC (virtual private cloud  —  виртуальное приватное облако);
  • две приватные подсети для размещения RDS (сервиса реляционной базы данных), а также группу безопасности для разрешения трафика на порт 5432;
  • один экземпляр EC2 в публичной подсети для выполнения Python-скрипта (который также будет служить SSH-туннелем для подключения к базе данных с помощью Airbyte, поскольку база данных не является общедоступной по соображениям безопасности) и группу безопасности для разрешения SSH.

Вот как выглядит RDS в конечном итоге:

А вот как выглядит EC2, где будет размещен Python-скрипт:

Вам, наверное, интересно, что делает этот длинный код.

Он просто создает экземпляр EC2, указывая AMI, группу безопасности, подсеть и user_data для установки Docker.

Итак, у нас есть ресурсы, созданные на облачной платформе AWS. Как теперь развернуть код на этом экземпляре EC2?

Используем конвейер CI/CD с действиями на GitHub:

Что делает этот код?

  • Позволяет подключиться к экземпляру EC2 с помощью SSH.
  • Копирует контент каталога, содержащего скрипт Python, на этот EC2.
  • Запускает контейнер.

Где будут храниться данные для аналитики?

Сейчас у нас есть данные, которые обновляются каждые два часа на RDS. Что по целевой системе? Ответ однозначен: Snowflake.

В этом разделе не так много интересного. Вот несколько слов о том, как я структурирую базу данных.

  • RAW: база данных для хранения “сырых” данных, поступающих из Airbyte (схема: postgres_airbyte).
  • ANALYTICS: производственная база данных (схемы: staging, intermediate, marts(finance)).
  • DBT_DEV: база данных разработки (имеет те же схемы, что и производственная база данных).
  • DATA_ENGINEER: роль для использования базы данных RAW и владения базами данных ANALYTICS и DBT_DEV.
  • AIRBYTE_ROLE: используется Airbyte для записи в базу данных RAW (схема postgres_airbyte).

Как связать две системы? Как поглощать данные?

Изначально я предполагал написать для этой задачи собственный Python-скрипт на AWS Lambda. Однако его реализация заняла бы значительное количество времени, особенно учитывая необходимость включения функции CDC (Change Data Capture  —  отслеживание измененных данных) для захвата только новых данных из исходной системы.

Поэтому решил воспользоваться no-code/современными инструментами, составляющими основу современного стека данных (MDS).

Я выбрал Airbyte, точнее Airbyte Cloud. Хотя можно было выбрать версию с открытым исходным кодом и установить ее с помощью Docker на экземпляр EC2, я предпочел облачную версию, предлагающую 14-дневную бесплатный пробный период.

Синхронизация данных из исходной системы в целевую с помощью Airbyte очень проста. Тут практически нечего объяснять.

Что касается режима синхронизации, я выбрал “Incremental” (“Инкрементный”), при котором Airbyte извлекает только новые данные из исходной системы, и “Append + Deduped” (“Добавление данных + Удаление повторов”), который не требует пояснений: Airbyte обеспечивает уникальность каждой строки на основе указанного столбца (обычно это первичный ключ).

Благодаря современному стеку данных, мы настроили и запустили конвейер поглощения данных менее чем за 10 минут!

P. S. Airbyte синхронизирует данные каждый день в 5 часов вечера (запомните это, пригодится позже).

Как сделать данные пригодными для аналитики/дэшбординга?

Как можно судить по модели данных исходной системы, мы не могли использовать эти таблицы для аналитики и дэшбординга. Вот тут-то и пригодился подход Кимбалла  —  методология проектирования и управления хранилищами данных, разработанная Ральфом Кимбаллом.

Взгляните на эту красивую и простую схему:

P. S. Наша конечная цель  —  анализ метрик плана подписки.

Теперь у нас есть желаемая модель данных, но как создать таблицы?

Именно здесь на помощь приходит DBT (data build tool  —  инструмент преобразования данных).

Вот как мы структурировали DBT-проект.

  • models/staging: это модели, которые хранятся в схеме staging и подвергаются простому приведению типов и быстрым преобразованиям.

Пример созданной нами модели (stg_bank):

  • models/intermediate: здесь мы создаем таблицы фактов и измерений.

Вот пример (int_date):

Я также создал несколько модульных тестов, просто чтобы протестировать.

  • models/marts: здесь мы рассчитываем различные показатели, например общий чистый доход по плану подписки.

Мы закончили с DBT (пока что). Теперь перейдем к Airflow.

Airflow + Cosmos + DBT = история любви

Когда с DBT было покончено, нам нужно было перейти к следующему шагу  —  запланировать ежедневный запуск DBT-моделей и обновлять таблицы, чтобы дата-аналитики/сайентисты могли проводить анализ.

Вот тут-то и пригодился Airflow, особенно библиотека Cosmos, которая позволила легко запускать DBT-модели с помощью Airflow.

Как я уже говорил, Airbyte получал данные каждый день в 17:00, поэтому необходимо было запланировать запуск группы Airflow после 17:00 (каждое утро нам нужны свежие таблицы).

Вот как мы определили DAG с помощью Cosmos:

Не волнуйтесь, ссылка на репозиторий GitHub размещена в конце статьи.

Следующим шагом было развертывание кода Airflow на EC2 (который мы создали с помощью Terraform).

P. S. Часть SCRIPT_AFTER бесполезна.

Можете нас поздравить: DAG теперь успешно работает (полюбуйтесь на этот красивый пользовательский интерфейс Airflow со всеми моделями), и таблицы будут обновляться ежедневно.

Дэшборд

Дэшборд я создал, руководствуясь знаменитым правилом KISS (“Кeep it simple stupid”  —  “Упрощай до примитива”).

Тут все по делу. Никаких вычурных визуальных эффектов или диаграмм.


Заключение

Я намеренно сделал эту статью очень простой и понятной.

Избегая слишком глубокого погружения в технические детали, я хотел, чтобы вы получили общее представление о типичном рабочем процессе инженера данных.

Ссылка на GitHub.

Читайте также:

Читайте нас в Telegram, VK и Дзен


Перевод статьи Dorian Teffo: How I Built This Data Platform in One Week

Предыдущая статьяВозможности Docker, о которых вы не знали. Часть 1
Следующая статьяНовая эра Angular: беззоновое обнаружение изменений