От Pandas к Pyspark

Изучение программирования с помощью Pandas  —  это как начало работы с программой Hello World в мире науки о данных. Pandas  —  широко используемая, интуитивно понятная и простая в освоении библиотека Python. Она работает с датафреймами, в которых данные представлены в табличном формате со строками и столбцами (как электронные таблицы). Pandas загружает все данные в память машины (Single Node) для более быстрого выполнения.

Почему стоит попробовать Pyspark?

Хотя Pandas чаще всего используется для работы с данными в табличном формате, особенно в Data Science, она не в полной мере поддерживает распараллеливание. Pyspark  —  это “питонный” API для Spark. Эта система была выпущена для поддержки взаимодействия между средами Python и Spark.

Pyspark с ее “кластерными вычислениями” обрабатывает данные путем их распределения, запуская код на нескольких узлах. Это сокращает время выполнения. Поскольку данные создаются экспоненциально каждый день, специалистам в этой сфере приходится иметь дело с их огромными массивами.

Итак, если вам необходимо перейти от одного узла к нескольким и если вы хотите освоить распределенные кластерные вычисления, эта статья поможет вам в преобразовании кода Pandas в Pyspark. Здесь приведены примеры наиболее часто используемых преобразований датафреймов Pandas и некоторых других операций вместе с соответствующим синтаксисом Pyspark.

Начнем!

Выполним импорт и создадим несколько сессий:

# PANDAS
import pandas as pd
from datetime import timedelta
# PYSPARK
from pyspark.sql import SparkSession
from pyspark.sql import functions as F spark=SparkSession.builder.appName('spark_session').getOrCreate()

Создание датафреймов 

# PANDAS
df = [['A1', 'B1', 2, '21-12-2021 10:30'],
['A2', 'B2', 4, '21-12-2021 10:40'],
['A3', 'B3', 5, '21-12-2021 11:00']] # Строки
df = pd.DataFrame(df, columns = ['A', 'B', 'Value', 'Date_Column'])
# PYSPARK
df = spark.createDataFrame(
[('A1', 'B1', 2, '21-12-2021 10:30'),
('A2', 'B2', 4, '21-12-2021 10:40'),
('A3', 'B3', 5, '21-12-2021 11:00')], # Rows
['A', 'B', 'Value', 'Date_Column'] # Columns
)
# ПРИМЕЧАНИЕ: есть несколько способов создания датафреймов в обеих библиотеках.

Создание новых столбцов

# PANDAS - новый столбец с постоянными значениями
df['C'] = 'New Constant'
# PYSPARK - новый столбец с постоянными значениями
df = df.withColumn("C", F.lit('New Constant'))
# PANDAS - новый столбец с использованием существующих столбцов
df['C'] = df['A']+df['B']
# PYSPARK - новый столбец с использованием существующих столбцов
df = df.withColumn("C", F.concat("A", "B"))
# ПРИМЕЧАНИЕ:
lit() -- используется для создания постоянных столбцов
concat() -- объединяет столбцы датафрейма
withColumn() -- создает новый столбец

Обновление данных существующего столбца

# PANDAS - обновление данных столбца
df['Value'] = df['Value']**2
# PYSPARK - обновление данных столбца
df = df.withColumn("Value", F.col("Value")**2)

Выборка, фильтрация данных

# PANDAS - выбор столбцов
new_df = df[['B', 'C']]
# PYSPARK - Выбор столбцов
new_df = df.select("B", "C")
# PANDAS - фильтрация строк на основе условия
new_df = df[df['Value']<5]
# PYSPARK - фильтрация строк на основе условия
new_df = df.filter(df.Value<5)

Преобразования типа столбца

# PANDAS - преобразование столбца из формата String в формат DateTime
df['Date_Column'] = pd.to_datetime(df['Date_Column'], format='%d-%m-%Y %H:%M')
# PYSPARK - преобразование столбца из формата String в формат Timestamp
df = df.withColumn("Date_Column", F.to_timestamp("Date_Column", "dd-MM-yyyy hh:mm"))

Переименование, удаление столбцов

# PANDAS - переименовать столбцы
df = df.rename(columns={'A': 'Col_A', 'B': 'Col_B'})
# PYSPARK - переименовать столбцы
df = df.withColumnRenamed("A", "Col_A").withColumnRenamed("B", "Col_B")
# PANDAS - удалить столбцы
df = df.drop(['Col_A', 'Col_B'], axis=1)
# PYSPARK - удалить столбцы
df = df.drop('A', 'B')

Разложение датафреймов

Разверните датафрейм из широкого формата в длинный.

# PANDAS
df = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'},
'B': {0: 1, 1: 3, 2: 5},
'C': {0: 2, 1: 4, 2: 6}})
pd.melt(df, id_vars=['A'], value_vars=['B', 'C'])
# Вывод
A | переменная| значение
a B 1
b B 3
c B 5
a C 2
b C 4
c C 6

В Pyspark (релиз 3.0) нет прямой версии pd.melt.

# Пользовательская функция разложения PYSPARK
def melt(df, 
         id_vars, 
         value_vars, 
         var_name="Variable", 
         value_name="Value"):
    _vars_and_vals = F.array(*(F.struct(F.lit(c).alias(var_name),
                   F.col(c).alias(value_name)) for c in value_vars))
    _tmp = df.withColumn("_vars_and_vals",
                         F.explode(_vars_and_vals))
    cols = id_vars + [
            F.col("_vars_and_vals")[x].alias(x) for x in [var_name,
            value_name]]
    return _tmp.select(*cols)
df = spark.createDataFrame(
    [('a', 1, 2), ('b', 3, 4), ('c', 5, 6)], # Строки
    ['A', 'B', 'C'] # Столбцы
)

melt(df, ['A'], ['B', 'C']).display()
# Вывод
A | переменная| значение
a B 1
b B 3
c B 5
a C 2
b C 4
c C 6

Добавление интервала к столбцу Timestamp (Timedelta)

# PANDAS - добавление'Interval' к'Start_Time'
df = pd.DataFrame([['2021-01-10 10:10:00', '00:05'],
['2021-12-10, 05:30:00', '00:15'],
['2021-11-10 11:40:00', '00:20']],
columns = ['Start_Time','Interval'])
df['Start_Time'] = pd.to_datetime(df['Start_Time'])
df['End_Time'] = df['Start_Time'] + \

pd.to_timedelta(pd.to_datetime(df['Interval']).dt.\
strftime('%H:%M:%S'))

Сейчас в Pyspark нет встроенного метода, подобного to_timedelta. Вот альтернативный способ сделать это в Pyspark.

# PYSPARK - добавление'Interval' к 'Start_Time'
df = spark.createDataFrame(
[['2021-01-10 10:10:00', '00:05'],
['2021-12-10 05:30:00', '00:15'],
['2021-11-10 11:40:00', '00:20']], # Строки
['Start_Time', 'Interval'] # Столбцы
)
df = df.withColumn("Start_Time",
F.to_timestamp("Start_Time",
"yyyy-MM-dd hh:mm:ss"))
df = df.withColumn("End_Time", (F.unix_timestamp("Start_Time") + F.unix_timestamp("Interval", "HH:mm")).cast('timestamp'))

Дополнительный синтаксис

# PANDAS df
df = pd.DataFrame({'A': {0: 'a', 1: 'a', 2: 'c'},
'B': {0: 1, 1: 1, 2: 5},
'C': {0: 2, 1: 4, 2: 6}})
# PYSPARK df
df = spark.createDataFrame(
[('a', 1, 2), ('a', 1, 4), ('c', 5, 6)], # Rows
['A', 'B', 'C'] # Columns
)
# PANDAS - форма датафрейма
print(df.shape)
# PYSPARK - форма датафрейма
print((df.count(), len(df.columns)))
# PANDAS - различные значения столбца
df['A'].unique()
# PYSPARK - различные значения столбца
df.select('A').distinct()
# PANDAS - группировка по столбцам - вычисление агрегирующих функций
df.groupby(['A', 'B']).sum()
# PYSPARK - группировка по столбцам - вычисление агрегирующих функций
df.groupBy("A", "B").agg(F.sum("C"))

Заключение

Pandas по-прежнему остается преимущественным выбором многих специалистов по обработке данных. Однако эту библиотеку удобно использовать для небольших наборов данных, которые не потребляют слишком много памяти. Если же вы имеете дело с большими наборами данных, Pyspark поможет наилучшим образом справиться с распределенными вычислениями.

Читайте также:

Читайте нас в TelegramVK и Яндекс.Дзен


Перевод статьи Bhanu Sree Balisetty: From Pandas to Pyspark

Предыдущая статьяЧто делать, когда ваши сотрудники конфликтуют?
Следующая статьяКак повысить SEO-рейтинг сайта с помощью Next.JS