Обычно дата-инженерам приходится получать данные из нескольких источников, а затем очищать их и агрегировать. Часто эти процессы необходимо применять на больших объемах данных.
Сегодня мы рассмотрим одно из самых фундаментальных понятий в области вычислительных технологий и в частности дата-инженерии — параллельные вычисления. С их помощью современные приложения могут обрабатывать огромные объемы данных за относительно небольшие промежутки времени.
Обсудим преимущества параллельных вычислений в целом, а также их недостатки. Изучим несколько программных пакетов и фреймворков, использующих возможности современных многоядерных систем и кластеров компьютеров для распределения и параллелизации рабочих нагрузок.
Определение параллельного программирования
Параллельные вычисления лежат в основе большого количества современных инструментов обработки данных. Эти фреймворки используют вычислительную мощность и память. Современные машины предлагают их таким образом, что основная задача может быть разбита на подзадачи, которые могут выполняться параллельно на нескольких компьютерах.
Обратите внимание, что параллельные вычисления могут работать и на нескольких ядрах в одной машине.
Преимущества параллельного программирования
Первая отличительная черта параллельного программирования связана с применением многоядерных систем, которые позволяют решать задачи за меньшее время.
В эпоху больших данных датасеты могут достигать невероятных размеров, и “поместить” их в одну машину — порой невыполнимая задача. Благодаря параллельным вычислениям такие наборы данных могут загружаться распределенным методом с помощью нескольких вычислительных машин.
Параллельное замедление
Помимо очевидных преимуществ, концепция параллельных вычислений имеет недостатки. Распределение задач по кластеру компьютеров сопровождается некоторыми издержками, связанными с тем, как узлы взаимодействуют друг с другом.
Поэтому в некоторых случаях распределение довольно простых задач может не только не ускорить выполнение, но и замедлить его, спровоцировав параллельное замедление.
Другими словами, решать небольшие и легкие задачи эффективнее (и, возможно, легче) на одной машине, чем распределять их по кластеру узлов.
Еще один фактор, который нужно принимать во внимание при распараллеливании задач, — это возможность их распределения по кластеру вычислительных узлов.
Параллельное программирование на Python
Пакет multiprocessing
входит в стандартную библиотеку Python. Он предлагает интуитивно простой API, позволяющий запускать несколько процессов как для локального, так и для удаленного параллелизма, что является хорошим способом обходить глобальную блокировку интерпретатора (Global Interpreter Lock).
Говоря более конкретно, объект Pool
можно использовать, чтобы распараллеливать выполнение функции по нескольким входным значениям таким образом, что входные данные будут распределяться по нескольким процессам, которые могут быть запущены на разных ядрах CPU даже на одной машине.
Предположим, что у нас есть датафрейм Pandas, содержащий персональную информацию о студентах, в том числе их возраст и год обучения:
import pandas as pd df = pd.DataFrame( [ (1, 2021, 15), (2, 2020, 14), (3, 2021, 17), (4, 2019, 13), (5, 2019, 14), (6, 2020, 15), (7, 2020, 14), (8, 2021, 14), (9, 2021, 13), (10, 2020, 14), (11, 2019, 12), (12, 2018, 10), (13, 2019, 15), (14, 2019, 16), ], columns=['student_id', 'class_year', 'age'] ) print(df) student_id class_year age 0 1 2021 15 1 2 2020 14 2 3 2021 17 3 4 2019 13 4 5 2019 14 5 6 2020 15 6 7 2020 14 7 8 2021 14 8 9 2021 13 9 10 2020 14 10 11 2019 12 11 12 2018 10 12 13 2019 15 13 14 2019 16
Теперь представьте, что нам нужно вычислить средний возраст студентов в каждом учебном году. В нижеприведенном примере мы используем библиотеку multiprocessing
, чтобы распараллелить это вычисление по четырем ядрам локальной машины.
from multiprocessing import Pool def compute_mean_age(groupby_year): year, group = groupby_year return pd.DataFrame({'age': group['age'].mean()}, index=[year]) with Pool(4) as p: mean_age = p.map(compute_mean_age, df.groupby('class_year')) df_mean_age_per_year = pd.concat(mean_age)
И вот результат:
print(df_mean_age_per_year)
age
2018 10.00
2019 14.00
2020 14.25
2021 14.75
Обратите внимание, что в этом примере используется очень маленький набор данных, который при иных обстоятельствах вызвал бы эффект параллельного замедления, о котором говорилось ранее. Приведенный выше подход пригодится только при работе с большим объемом данных.
Для маленьких датасетов можно применять следующий способ:
>>> df.groupby('class_year')['age'].mean() class_year 2018 10.00 2019 14.00 2020 14.25 2021 14.75 Name: age, dtype: float64
Современные фреймворки для параллельных вычислений
С течением времени появляются новые технологии, способные распределять рабочие нагрузки и параллельно выполнять несколько задач с помощью многоядерных систем и кластеров компьютеров.
Одним из таких инструментов является Dask, который предлагает функциональность для расширенного параллелизма, обеспечивающего масштабируемость для таких инструментов, как numpy
, pandas
и scikit-learn
.
Вместо “низкоуровневого” пакета multiprocessing
, можно использовать Dask для вычисления среднего возраста студентов за каждый учебный год из предыдущего примера. Это позволяет добиться такого же результата, но с более чистым и интуитивно понятным кодом, который представлен ниже.
import dask.dataframe as dd dd_students = dd.from_pandas(df, npartitions=4) df_mean_age_per_year = \ dd_sudents.groupby('class_year').age.mean().compute()
Если вы занимались исследованием больших данных, то, вероятно, сталкивались с Hadoop, который поддерживается Apache Software Foundation. Hadoop представляет собой набор проектов с открытым исходным кодом, в том числе MapReduce и Hadoop Distributed File System (HDFS).
В двух словах, MapReduce — это одна из первых парадигм распределенного программирования, которая используется для доступа к большим объемам данным, хранящимся на HDFS. Другим популярным инструментом экосистемы Hadoop является Hive, который представляет собой слой, работающий поверх Hadoop и позволяющий получать доступ к данным с помощью Hive SQL — структурированного языка запросов.
Например, с помощью Hive можно написать запрос. Он, в свою очередь, будет преобразован в задание, которое может быть выполнено на кластере компьютеров.
Еще один популярный инструмент — Apache Spark. Он представляет собой движок, который применяется для масштабируемой обработки больших данных и машинного обучения. Этот фреймворк поддерживает множество языков, в том числе Scala и Python, и пригодится для SQL-аналитики, науки о данных и машинного обучения, а также пакетной и потоковой обработки данных.
Выводы
В сегодняшней статье мы рассмотрели один из важнейших столпов в инженерии данных — параллельные вычисления. Обсудили, зачем они нужны и какую пользу приносят, особенно в эпоху больших данных. Кроме того, мы изучили, как выполнять такие вычисления с помощью пакета Python multiprocessing
поверх датафрейма Pandas, а также как добиться аналогичного результата с помощью dask
.
В конце мы познакомились с наиболее часто используемыми в параллельном вычислении фреймворками (MapReduce, HDFS, Hive и Apache Spark), о существовании которых должен знать каждый дата-инженер. Они помогают проектировать и внедрять эффективные и масштабируемые потоки данных в организации.
Читайте также:
- Уникальный пример использования SocketCluster для распределенных вычислений
- Краткий обзор 10 популярных архитектурных шаблонов приложений
- Потоковые и многопроцессорные модули на Python
Читайте нас в Telegram, VK и Яндекс.Дзен
Перевод статьи Giorgos Myrianthous: What is Parallel Computing?