Потоки Kafka: как обрабатывать CSV-файлы для выполнения вычислений

Kafka Streams  —  популярная библиотека для создания потоковых приложений. Она предлагает надежное решение для приложений и микросервисов, которые должны очень быстро обрабатывать данные в режиме реального времени.

Из этого руководства вы узнаете, как обрабатывать большие CSV-файлы с историческими записями погоды. Цель  —  рассчитать среднесуточную температуру. Для этого мы используем топик Kafka, преобразуем данные и отправляем их в другой топик. Кроме того, вы познакомитесь с часто используемыми потоковыми операциями, такими как агрегации.

Начнем же!

Установка и запуск Kafka

Есть несколько способов запустить Kafka. Например, можно использовать образ Docker. Но сейчас установим Kafka локально.

  1. Загрузите последнюю версию Kafka здесь.
  2. Извлеките пакет и перейдите в папку Kafka.
$ tar -xzf kafka_2.13-2.8.0.tgz
$ cd kafka_2.13-2.8.0

3. Запустите Zookeeper из терминала, воспользовавшись этой командой:

$ bin/zookeeper-server-start.sh config/zookeeper.properties

4. Запустите брокер-службу Kafka с другого терминала с помощью этой команды:

$ bin/kafka-server-start.sh config/server.properties

Создание топиков Kafka

На другой вкладке терминала создайте топики для ввода и вывода, как показано ниже:

$ bin/kafka-topics.sh — create — topic weather-tmp-input — bootstrap-server localhost:9092
bin/kafka-topics.sh — create — topic weather-tmp-output — bootstrap-server localhost:9092

Получение набора данных

Загрузите CSV-файл, содержащий данные о погоде. Этот набор данных взят с Kaggle.com. Его также можно скачать из репозитория GitHub. Записи в нем выглядят так:

Написание Java-кода

  1. Создайте новый проект Maven в любимой IDE и поместите следующие зависимости в файл pom.xml:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.4</version>
</dependency>

2. Создайте класс Weather.java, как показано ниже:

package org.demo;

public class Weather {

private final String date;
public final String day;
public final Double temperature;

public Weather(String date, Double temperature) {
this.date = date;
this.temperature = temperature;
this.day = this.date.split("\\s")[0];
}
}
  • Из CSV-записи понадобятся только дата и температура.
  • Для извлечения даты из данных воспользуемся регулярным выражением “\\s”.

3. Создайте новый Java-файл KafkaStreamsDemo.java и скопируйте туда код:

package org.demo;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;

import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

import static org.apache.kafka.streams.kstream.Grouped.with;

public class KafkaStreamsDemo {

public static final String INPUT_TOPIC = "weather-tmp-input";

public static final String OUTPUT_TOPIC = "weather-tmp-output";

static void createAvgTempCalcStream(final StreamsBuilder builder) {
KStream<String, String> inputStream = builder.stream(INPUT_TOPIC);

KStream<String, Double> dayTempStream = inputStream
.map((s, line) -> {
try {
Weather w = mapLineToWeather(line);
return new KeyValue<>(w.day, w.temperature);
} catch (Exception e) {
System.err.println("Mapping error:" + e.getMessage());
throw e;
}
});

dayTempStream
.groupByKey(with(Serdes.String(), Serdes.Double()))
.aggregate(TempCalculator::new, (day, temperature, tempCalculator) -> {
try {
tempCalculator.incrementCounter();
tempCalculator.addSum(temperature);
return tempCalculator;
} catch (Exception e) {
System.err.println(e.getMessage());
throw e;
}
}, Materialized.with(Serdes.String(), CustomSerdes.instance()))
.toStream()
.mapValues((day, tempCalculator) -> tempCalculator.getAvg())
.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Double()));
}

public static Weather mapLineToWeather(String csvLine) {
String[] csvFields = csvLine.split(",");
String date = csvFields[0];
Double temp = Double.parseDouble(csvFields[3]);
return new Weather(date, temp);
}

public static void main(final String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "demo");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

StreamsBuilder builder = new StreamsBuilder();

createAvgTempCalcStream(builder);

KafkaStreams streams = new KafkaStreams(builder.build(), props);

final CountDownLatch latch = new CountDownLatch(1);

// Ловим событие ctrl + c
Runtime.getRuntime().addShutdownHook(new Thread("shutdown-hook") {
@Override
public void run() {
System.out.println("Shutdown streams...");
streams.close();
latch.countDown();
}
});

try {
System.out.println("Streams started...");
streams.start();
System.out.println("Waiting for events...");
latch.await();
} catch (final Throwable e) {
System.err.println(e.getMessage());
System.exit(1);
}
System.exit(0);
}

static class TempCalculator {
Integer count;
Double sum;

public TempCalculator() {
count = 0;
sum = 0.0;
}

public Double getSum() {
return sum;
}
public Integer getCount() {
return count;
}
@JsonIgnore
public double getAvg() {
if (count != 0) {
return sum / count;
} else {
System.err.println("No day records found");
return sum;
}
}

@JsonIgnore
public void incrementCounter() {
++ this.count;
}

@JsonIgnore
public void addSum(Double sum) {
this.sum += sum;
}

}
}

class JsonSerializer<T> implements Serializer<T> {
private final ObjectMapper objectMapper = new ObjectMapper();

@Override
public void configure(Map<String, ?> props, boolean isKey) {
// ничего не делать
}

@Override
public byte[] serialize(String topic, T data) {
if (data == null)
return null;

try {
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error serializing JSON message", e);
}
}

@Override
public void close() {
// ничего не делать
}

}

class JsonDeserializer<T> implements Deserializer<T> {
private final ObjectMapper objectMapper = new ObjectMapper();

private Class<T> tClass;

public JsonDeserializer() {
}

public JsonDeserializer(Class<T> tClass) {
this.tClass = tClass;
}

@Override
public void configure(Map<String, ?> props, boolean isKey) {
// ничего не делать
}

@Override
public T deserialize(String topic, byte[] bytes) {
if (bytes == null)
return null;

T data;
try {
data = objectMapper.readValue(bytes, tClass);
} catch (Exception e) {
throw new SerializationException(e);
}

return data;
}

@Override
public void close() {
// ничего не делать
}
}

final class CustomSerdes {

static public final class MySerde
extends Serdes.WrapperSerde<KafkaStreamsDemo.TempCalculator> {
public MySerde() {
super(new JsonSerializer<>(),
new JsonDeserializer<>(KafkaStreamsDemo.TempCalculator.class));
}
}

public static Serde<KafkaStreamsDemo.TempCalculator> instance() {
return new CustomSerdes.MySerde();
}

}

Пошаговое объяснение:

  • Метод createAvgTempCalcStream() определяет поток, который вычисляет среднюю дневную температуру на основе записей в CSV-файле.
  • Сначала мы используем inputStream, чтобы сопоставить записи в файле с объектом Weather. Операция маппинга Kafka преобразует запись на входе в новую запись в потоке вывода. Это также дает возможность изменять типы ключей/значений.
  • Метод mapLineToWeather() создает новый объект Weather. Мы разделяем строки, ориентируясь на запятую. Первое и четвертое значения будут использованы для извлечения даты и температуры.
  • Оператор groupByKey группирует записи в потоке по существующему ключу. Эта операция необходима, прежде чем мы сможем выполнить агрегацию. Это приводит к KGroupStream. Обратите внимание, что записи, содержащие null, не включаются в результат.
  • Поскольку дефолтный сериализатор/десериализатор не соответствуют типам ключей/значений, нужно явно указать новый. В данном случае  —  (with(Serdes.String(), Serdes.Double()). Дата представляет собой строку, а средняя температура  —  число с дробной частью (double).
  • Операция агрегирования aggregate позволяет выходному значению иметь тип, отличный от типа входного значения. Она объединяет значения по сгруппированному ключу.
  • Класс TempCalculator помогает рассчитать среднесуточную температуру. Мы подсчитываем все записи за один и тот же день и суммируем температуру. Затем делим сумму sum на количество count, чтобы получить среднее значение.
  • Поток материализуется в KTable. Для объекта TempCalculator нужен пользовательский сериализатор/десериализатор. Вот почему надо создать JsonSerializer и JsonDeserializer  —  они используются для создания класса CustomSerdes.
  • Наконец, воспользуемся операцией mapValues для преобразования записей в новую пару ключ-значение. В нем сохраняется ключ исходной записи. День  —  это String, а средняя температура  —  Double. Мы отправляем результаты в топик вывода Kafka, который создали ранее.
  • Поскольку нужно, чтобы приложение работало и прослушивало топики ввода, мы используем объект CountDownLatch. Это позволяет большему количеству потоков ожидать завершения набора операций, выполняемых в других потоках.

Запуск приложения

  1. Запустите Kafka из среды разработки.

2. С терминала отправьте CSV-файл в топик ввода.

cat /home/{user}/kafka-playground/weatherHistory.csv | bin/kafka-console-producer.sh — broker-list localhost:9092 — topic weather-tmp-input

3. В другом окне вызовите потребителя, как показано ниже:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic weather-tmp-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.DoubleDeserializer

Это приведет к запуску Java-кода. Вы должны увидеть результат, подобный приведенному ниже. Обратите внимание, что результат обрезан для краткости.

......
2016–09–30 17.090740740740742
2016–09–04 22.15300925925926
2016–09–05 16.838657407407407
2016–09–06 17.289583333333336
2016–09–07 21.448379629629628
2016–09–08 22.427314814814817
2016–09–09 22.702546296296294
......

Отлично! Программа Kafka обработала топик ввода и рассчитала среднесуточную температуру, как и ожидалось.

Заключение

В этом руководстве вы узнали, как пользоваться операциями потоков Kafka, такими как aggregate и groupByKey. Кроме того, теперь вам известно, как создать пользовательский сериализатор/десериализатор для преобразования данных в топиках вывода.

Читайте также:

Читайте нас в TelegramVK и Яндекс.Дзен


Перевод статьи Kirshi Yin: Kafka Streams: How To Process a CSV File To Perform Calculations

Предыдущая статьяСоздание модели машинного обучения с помощью Google Colab без дополнительных настроек
Следующая статьяЗапуск Puppeteer в Akka.js