Планирование и оркестрация облачных задач dbt Cloud с помощью Prefect
Марвин оркестрирует задачи dbt Cloud

Настройка dbt Cloud

Сначала нужно извлечь ID аккаунта dbt Cloud и создать ключ API. Для этого зайдите в Account Settings:

Теперь в URL вы должны увидеть ID аккаунта. Скопируйте его и вставьте в строку DBT_CLOUD_ACCOUNT_ID файла .env. Далее с той же страницы Account Settings перейдите в раздел API Access:

Создайте и скопируйте ключ API:

Создание программного блока

Вставьте этот ключ в строку DBT_CLOUD_API_KEY файла .env. Теперь этот файл должен содержать следующие две переменных среды:

DBT_CLOUD_API_KEY=xxx
DBT_CLOUD_ACCOUNT_ID=12345

Файл .env нужен, только если вы хотите создать блок DbtCloudCredentials программно. Проще всего сделать это, следуя рекомендациям из серии постов How to Build a Modular Data Stack — Data Platform with Prefect, dbt and Snowflake и используя репозиторий prefect-dataplatform. Вы можете непосредственно использовать один из автоматически развертываемых скриптов, например настройку локального выполнения, показанную в скрипте deploy_locally.py.

Создание блока из UI

В качестве альтернативы можно использовать ключ API dbt Cloud и ID аккаунта для настройки блока из Prefect UI:

После указания учетных данных нужно настроить проект dbt Cloud. Если у вас такой уже есть, можете следующий раздел пропустить.

Настройка проекта dbt Cloud

Сначала перейдите в аккаунт dbt Cloud и создайте новый проект:

Следуйте приведенной инструкции и заполните учетные данные вашего хранилища:

В самом конце нужно будет выбрать ваш репозиторий Git:

Затем можно будет начать разрабатывать IDE:

Выполнение простой команды dbt compile поможет убедиться, что все работает нормально:

Создание среды dbt

Прежде чем мы сможем создавать задачи, необходимо настроить среду.

Создаем среду:

Можете назвать ее в соответствии со своим dev/prod хранилищем Snowflake или схемой. Также можете связать ее с ветвью Git, соответствующей вашей dev/prod среде:

После создания среды dbt Cloud предложит создать задачу:

Создание задачи dbt

Следуйте инструкциям “Create new Job” и не забудьте отключить планирование, чтобы вы могли оркестрировать эту задачу с помощью Prefect:

Копирование ID задачи из URL

URL содержит ID задачи, который потребуется для ее активации:

Выполнение задачи dbt Cloud с помощью Prefect

Теперь можно активировать задачу dbt Cloud с помощью Prefect. Вот простой поток, который загружает блок учетных данных (безопасно сохраняя ID аккаунта dbt Cloud и ключ API) и активирует выполнение для ID этой задачи (скорректируйте на ваш в строке 10):

from prefect import flow

from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run_and_wait_for_completion


@flow
def run_dbt_job_flow():
trigger_dbt_cloud_job_run_and_wait_for_completion(
dbt_cloud_credentials=DbtCloudCredentials.load("default"), job_id=154217
)


if __name__ == "__main__":
run_dbt_job_flow()

После выполнения этого скрипта Python вы сможете отслеживать логи из терминала:

Или из dbt Cloud:

Или из Prefect Cloud UI:

Логи в Prefect Cloud UI упрощают переход из Prefect в логи, находящиеся в информационной панели dbt Cloud:

Это было короткое ознакомление о том, как настраивать задачу dbt Cloud и оркестрировать ее с помощью блоков Prefect и коллекции prefect-dbt

Читайте также:

Читайте нас в TelegramVK и Дзен


Перевод статьи Anna Geller: Schedule & orchestrate dbt Cloud jobs with Prefect

Предыдущая статья11 шагов на пути к работе дата-сайентиста
Следующая статьяКак я создал расширение браузера и обучил ChatGPT обращаться к внешним сайтам за информацией о текущих событиях