Изучение программирования с помощью Pandas — это как начало работы с программой Hello World в мире науки о данных. Pandas — широко используемая, интуитивно понятная и простая в освоении библиотека Python. Она работает с датафреймами, в которых данные представлены в табличном формате со строками и столбцами (как электронные таблицы). Pandas загружает все данные в память машины (Single Node) для более быстрого выполнения.
Почему стоит попробовать Pyspark?
Хотя Pandas чаще всего используется для работы с данными в табличном формате, особенно в Data Science, она не в полной мере поддерживает распараллеливание. Pyspark — это “питонный” API для Spark. Эта система была выпущена для поддержки взаимодействия между средами Python и Spark.
Pyspark с ее “кластерными вычислениями” обрабатывает данные путем их распределения, запуская код на нескольких узлах. Это сокращает время выполнения. Поскольку данные создаются экспоненциально каждый день, специалистам в этой сфере приходится иметь дело с их огромными массивами.
Итак, если вам необходимо перейти от одного узла к нескольким и если вы хотите освоить распределенные кластерные вычисления, эта статья поможет вам в преобразовании кода Pandas в Pyspark. Здесь приведены примеры наиболее часто используемых преобразований датафреймов Pandas и некоторых других операций вместе с соответствующим синтаксисом Pyspark.
Начнем!
Выполним импорт и создадим несколько сессий:
# PANDAS
import pandas as pd
from datetime import timedelta
# PYSPARK
from pyspark.sql import SparkSession
from pyspark.sql import functions as F spark=SparkSession.builder.appName('spark_session').getOrCreate()
Создание датафреймов
# PANDAS
df = [['A1', 'B1', 2, '21-12-2021 10:30'],
['A2', 'B2', 4, '21-12-2021 10:40'],
['A3', 'B3', 5, '21-12-2021 11:00']] # Строки
df = pd.DataFrame(df, columns = ['A', 'B', 'Value', 'Date_Column'])
# PYSPARK
df = spark.createDataFrame(
[('A1', 'B1', 2, '21-12-2021 10:30'),
('A2', 'B2', 4, '21-12-2021 10:40'),
('A3', 'B3', 5, '21-12-2021 11:00')], # Rows
['A', 'B', 'Value', 'Date_Column'] # Columns
)
# ПРИМЕЧАНИЕ: есть несколько способов создания датафреймов в обеих библиотеках.
Создание новых столбцов
# PANDAS - новый столбец с постоянными значениями
df['C'] = 'New Constant'
# PYSPARK - новый столбец с постоянными значениями
df = df.withColumn("C", F.lit('New Constant'))
# PANDAS - новый столбец с использованием существующих столбцов
df['C'] = df['A']+df['B']
# PYSPARK - новый столбец с использованием существующих столбцов
df = df.withColumn("C", F.concat("A", "B"))
# ПРИМЕЧАНИЕ:
lit() -- используется для создания постоянных столбцов
concat() -- объединяет столбцы датафрейма
withColumn() -- создает новый столбец
Обновление данных существующего столбца
# PANDAS - обновление данных столбца
df['Value'] = df['Value']**2
# PYSPARK - обновление данных столбца
df = df.withColumn("Value", F.col("Value")**2)
Выборка, фильтрация данных
# PANDAS - выбор столбцов
new_df = df[['B', 'C']]
# PYSPARK - Выбор столбцов
new_df = df.select("B", "C")
# PANDAS - фильтрация строк на основе условия
new_df = df[df['Value']<5]
# PYSPARK - фильтрация строк на основе условия
new_df = df.filter(df.Value<5)
Преобразования типа столбца
# PANDAS - преобразование столбца из формата String в формат DateTime
df['Date_Column'] = pd.to_datetime(df['Date_Column'], format='%d-%m-%Y %H:%M')
# PYSPARK - преобразование столбца из формата String в формат Timestamp
df = df.withColumn("Date_Column", F.to_timestamp("Date_Column", "dd-MM-yyyy hh:mm"))
Переименование, удаление столбцов
# PANDAS - переименовать столбцы
df = df.rename(columns={'A': 'Col_A', 'B': 'Col_B'})
# PYSPARK - переименовать столбцы
df = df.withColumnRenamed("A", "Col_A").withColumnRenamed("B", "Col_B")
# PANDAS - удалить столбцы
df = df.drop(['Col_A', 'Col_B'], axis=1)
# PYSPARK - удалить столбцы
df = df.drop('A', 'B')
Разложение датафреймов
Разверните датафрейм из широкого формата в длинный.
# PANDAS
df = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'},
'B': {0: 1, 1: 3, 2: 5},
'C': {0: 2, 1: 4, 2: 6}})
pd.melt(df, id_vars=['A'], value_vars=['B', 'C'])
# Вывод
A | переменная| значение
a B 1
b B 3
c B 5
a C 2
b C 4
c C 6
В Pyspark (релиз 3.0) нет прямой версии pd.melt.
# Пользовательская функция разложения PYSPARK
def melt(df,
id_vars,
value_vars,
var_name="Variable",
value_name="Value"):
_vars_and_vals = F.array(*(F.struct(F.lit(c).alias(var_name),
F.col(c).alias(value_name)) for c in value_vars))
_tmp = df.withColumn("_vars_and_vals",
F.explode(_vars_and_vals))
cols = id_vars + [
F.col("_vars_and_vals")[x].alias(x) for x in [var_name,
value_name]]
return _tmp.select(*cols)
df = spark.createDataFrame(
[('a', 1, 2), ('b', 3, 4), ('c', 5, 6)], # Строки
['A', 'B', 'C'] # Столбцы
)
melt(df, ['A'], ['B', 'C']).display()
# Вывод
A | переменная| значение
a B 1
b B 3
c B 5
a C 2
b C 4
c C 6
Добавление интервала к столбцу Timestamp (Timedelta)
# PANDAS - добавление'Interval' к'Start_Time'
df = pd.DataFrame([['2021-01-10 10:10:00', '00:05'],
['2021-12-10, 05:30:00', '00:15'],
['2021-11-10 11:40:00', '00:20']],
columns = ['Start_Time','Interval'])
df['Start_Time'] = pd.to_datetime(df['Start_Time'])
df['End_Time'] = df['Start_Time'] + \
pd.to_timedelta(pd.to_datetime(df['Interval']).dt.\
strftime('%H:%M:%S'))
Сейчас в Pyspark нет встроенного метода, подобного to_timedelta
. Вот альтернативный способ сделать это в Pyspark.
# PYSPARK - добавление'Interval' к 'Start_Time'
df = spark.createDataFrame(
[['2021-01-10 10:10:00', '00:05'],
['2021-12-10 05:30:00', '00:15'],
['2021-11-10 11:40:00', '00:20']], # Строки
['Start_Time', 'Interval'] # Столбцы
)
df = df.withColumn("Start_Time",
F.to_timestamp("Start_Time",
"yyyy-MM-dd hh:mm:ss"))
df = df.withColumn("End_Time", (F.unix_timestamp("Start_Time") + F.unix_timestamp("Interval", "HH:mm")).cast('timestamp'))
Дополнительный синтаксис
# PANDAS df
df = pd.DataFrame({'A': {0: 'a', 1: 'a', 2: 'c'},
'B': {0: 1, 1: 1, 2: 5},
'C': {0: 2, 1: 4, 2: 6}})
# PYSPARK df
df = spark.createDataFrame(
[('a', 1, 2), ('a', 1, 4), ('c', 5, 6)], # Rows
['A', 'B', 'C'] # Columns
)
# PANDAS - форма датафрейма
print(df.shape)
# PYSPARK - форма датафрейма
print((df.count(), len(df.columns)))
# PANDAS - различные значения столбца
df['A'].unique()
# PYSPARK - различные значения столбца
df.select('A').distinct()
# PANDAS - группировка по столбцам - вычисление агрегирующих функций
df.groupby(['A', 'B']).sum()
# PYSPARK - группировка по столбцам - вычисление агрегирующих функций
df.groupBy("A", "B").agg(F.sum("C"))
Заключение
Pandas по-прежнему остается преимущественным выбором многих специалистов по обработке данных. Однако эту библиотеку удобно использовать для небольших наборов данных, которые не потребляют слишком много памяти. Если же вы имеете дело с большими наборами данных, Pyspark поможет наилучшим образом справиться с распределенными вычислениями.
Читайте также:
- 10 лайфхаков для работы с библиотекой Pandas
- Хватит использовать Pandas, пора переходить на Spark + Scala!
- Ускоряем работу с Pandas при помощи modin
Читайте нас в Telegram, VK и Яндекс.Дзен
Перевод статьи Bhanu Sree Balisetty: From Pandas to Pyspark