Kafka Streams — популярная библиотека для создания потоковых приложений. Она предлагает надежное решение для приложений и микросервисов, которые должны очень быстро обрабатывать данные в режиме реального времени.
Из этого руководства вы узнаете, как обрабатывать большие CSV-файлы с историческими записями погоды. Цель — рассчитать среднесуточную температуру. Для этого мы используем топик Kafka, преобразуем данные и отправляем их в другой топик. Кроме того, вы познакомитесь с часто используемыми потоковыми операциями, такими как агрегации.
Начнем же!
Установка и запуск Kafka
Есть несколько способов запустить Kafka. Например, можно использовать образ Docker. Но сейчас установим Kafka локально.
- Загрузите последнюю версию Kafka здесь.
- Извлеките пакет и перейдите в папку Kafka.
$ tar -xzf kafka_2.13-2.8.0.tgz
$ cd kafka_2.13-2.8.0
3. Запустите Zookeeper из терминала, воспользовавшись этой командой:
$ bin/zookeeper-server-start.sh config/zookeeper.properties
4. Запустите брокер-службу Kafka с другого терминала с помощью этой команды:
$ bin/kafka-server-start.sh config/server.properties
Создание топиков Kafka
На другой вкладке терминала создайте топики для ввода и вывода, как показано ниже:
$ bin/kafka-topics.sh — create — topic weather-tmp-input — bootstrap-server localhost:9092
bin/kafka-topics.sh — create — topic weather-tmp-output — bootstrap-server localhost:9092
Получение набора данных
Загрузите CSV-файл, содержащий данные о погоде. Этот набор данных взят с Kaggle.com. Его также можно скачать из репозитория GitHub. Записи в нем выглядят так:
Написание Java-кода
- Создайте новый проект Maven в любимой IDE и поместите следующие зависимости в файл
pom.xml
:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.4</version>
</dependency>
2. Создайте класс Weather.java
, как показано ниже:
package org.demo;
public class Weather {
private final String date;
public final String day;
public final Double temperature;
public Weather(String date, Double temperature) {
this.date = date;
this.temperature = temperature;
this.day = this.date.split("\\s")[0];
}
}
- Из CSV-записи понадобятся только дата и температура.
- Для извлечения даты из данных воспользуемся регулярным выражением
“\\s”
.
3. Создайте новый Java-файл KafkaStreamsDemo.java
и скопируйте туда код:
package org.demo;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import static org.apache.kafka.streams.kstream.Grouped.with;
public class KafkaStreamsDemo {
public static final String INPUT_TOPIC = "weather-tmp-input";
public static final String OUTPUT_TOPIC = "weather-tmp-output";
static void createAvgTempCalcStream(final StreamsBuilder builder) {
KStream<String, String> inputStream = builder.stream(INPUT_TOPIC);
KStream<String, Double> dayTempStream = inputStream
.map((s, line) -> {
try {
Weather w = mapLineToWeather(line);
return new KeyValue<>(w.day, w.temperature);
} catch (Exception e) {
System.err.println("Mapping error:" + e.getMessage());
throw e;
}
});
dayTempStream
.groupByKey(with(Serdes.String(), Serdes.Double()))
.aggregate(TempCalculator::new, (day, temperature, tempCalculator) -> {
try {
tempCalculator.incrementCounter();
tempCalculator.addSum(temperature);
return tempCalculator;
} catch (Exception e) {
System.err.println(e.getMessage());
throw e;
}
}, Materialized.with(Serdes.String(), CustomSerdes.instance()))
.toStream()
.mapValues((day, tempCalculator) -> tempCalculator.getAvg())
.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Double()));
}
public static Weather mapLineToWeather(String csvLine) {
String[] csvFields = csvLine.split(",");
String date = csvFields[0];
Double temp = Double.parseDouble(csvFields[3]);
return new Weather(date, temp);
}
public static void main(final String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "demo");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
createAvgTempCalcStream(builder);
KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
// Ловим событие ctrl + c
Runtime.getRuntime().addShutdownHook(new Thread("shutdown-hook") {
@Override
public void run() {
System.out.println("Shutdown streams...");
streams.close();
latch.countDown();
}
});
try {
System.out.println("Streams started...");
streams.start();
System.out.println("Waiting for events...");
latch.await();
} catch (final Throwable e) {
System.err.println(e.getMessage());
System.exit(1);
}
System.exit(0);
}
static class TempCalculator {
Integer count;
Double sum;
public TempCalculator() {
count = 0;
sum = 0.0;
}
public Double getSum() {
return sum;
}
public Integer getCount() {
return count;
}
@JsonIgnore
public double getAvg() {
if (count != 0) {
return sum / count;
} else {
System.err.println("No day records found");
return sum;
}
}
@JsonIgnore
public void incrementCounter() {
++ this.count;
}
@JsonIgnore
public void addSum(Double sum) {
this.sum += sum;
}
}
}
class JsonSerializer<T> implements Serializer<T> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> props, boolean isKey) {
// ничего не делать
}
@Override
public byte[] serialize(String topic, T data) {
if (data == null)
return null;
try {
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error serializing JSON message", e);
}
}
@Override
public void close() {
// ничего не делать
}
}
class JsonDeserializer<T> implements Deserializer<T> {
private final ObjectMapper objectMapper = new ObjectMapper();
private Class<T> tClass;
public JsonDeserializer() {
}
public JsonDeserializer(Class<T> tClass) {
this.tClass = tClass;
}
@Override
public void configure(Map<String, ?> props, boolean isKey) {
// ничего не делать
}
@Override
public T deserialize(String topic, byte[] bytes) {
if (bytes == null)
return null;
T data;
try {
data = objectMapper.readValue(bytes, tClass);
} catch (Exception e) {
throw new SerializationException(e);
}
return data;
}
@Override
public void close() {
// ничего не делать
}
}
final class CustomSerdes {
static public final class MySerde
extends Serdes.WrapperSerde<KafkaStreamsDemo.TempCalculator> {
public MySerde() {
super(new JsonSerializer<>(),
new JsonDeserializer<>(KafkaStreamsDemo.TempCalculator.class));
}
}
public static Serde<KafkaStreamsDemo.TempCalculator> instance() {
return new CustomSerdes.MySerde();
}
}
Пошаговое объяснение:
- Метод
createAvgTempCalcStream()
определяет поток, который вычисляет среднюю дневную температуру на основе записей в CSV-файле. - Сначала мы используем
inputStream
, чтобы сопоставить записи в файле с объектомWeather
. Операция маппинга Kafka преобразует запись на входе в новую запись в потоке вывода. Это также дает возможность изменять типы ключей/значений. - Метод
mapLineToWeather()
создает новый объектWeather
. Мы разделяем строки, ориентируясь на запятую. Первое и четвертое значения будут использованы для извлечения даты и температуры. - Оператор
groupByKey
группирует записи в потоке по существующему ключу. Эта операция необходима, прежде чем мы сможем выполнить агрегацию. Это приводит кKGroupStream
. Обратите внимание, что записи, содержащиеnull
, не включаются в результат. - Поскольку дефолтный сериализатор/десериализатор не соответствуют типам ключей/значений, нужно явно указать новый. В данном случае —
(with(Serdes.String(), Serdes.Double())
. Дата представляет собой строку, а средняя температура — число с дробной частью (double
). - Операция агрегирования
aggregate
позволяет выходному значению иметь тип, отличный от типа входного значения. Она объединяет значения по сгруппированному ключу. - Класс
TempCalculator
помогает рассчитать среднесуточную температуру. Мы подсчитываем все записи за один и тот же день и суммируем температуру. Затем делим суммуsum
на количествоcount
, чтобы получить среднее значение. - Поток материализуется в
KTable
. Для объектаTempCalculator
нужен пользовательский сериализатор/десериализатор. Вот почему надо создатьJsonSerializer
иJsonDeserializer
— они используются для создания классаCustomSerdes
. - Наконец, воспользуемся операцией
mapValues
для преобразования записей в новую пару ключ-значение. В нем сохраняется ключ исходной записи. День — это String, а средняя температура — Double. Мы отправляем результаты в топик вывода Kafka, который создали ранее. - Поскольку нужно, чтобы приложение работало и прослушивало топики ввода, мы используем объект
CountDownLatch
. Это позволяет большему количеству потоков ожидать завершения набора операций, выполняемых в других потоках.
Запуск приложения
- Запустите Kafka из среды разработки.
2. С терминала отправьте CSV-файл в топик ввода.
cat /home/{user}/kafka-playground/weatherHistory.csv | bin/kafka-console-producer.sh — broker-list localhost:9092 — topic weather-tmp-input
3. В другом окне вызовите потребителя, как показано ниже:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic weather-tmp-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.DoubleDeserializer
Это приведет к запуску Java-кода. Вы должны увидеть результат, подобный приведенному ниже. Обратите внимание, что результат обрезан для краткости.
......
2016–09–30 17.090740740740742
2016–09–04 22.15300925925926
2016–09–05 16.838657407407407
2016–09–06 17.289583333333336
2016–09–07 21.448379629629628
2016–09–08 22.427314814814817
2016–09–09 22.702546296296294
......
Отлично! Программа Kafka обработала топик ввода и рассчитала среднесуточную температуру, как и ожидалось.
Заключение
В этом руководстве вы узнали, как пользоваться операциями потоков Kafka, такими как aggregate
и groupByKey
. Кроме того, теперь вам известно, как создать пользовательский сериализатор/десериализатор для преобразования данных в топиках вывода.
Читайте также:
- Сравниваем эффективность Redis, Kafka и RabbitMQ
- PostgreSQL вместо Kafka: способ реализации системы очередей
- ClickHouse + Kafka = ❤
Читайте нас в Telegram, VK и Яндекс.Дзен
Перевод статьи Kirshi Yin: Kafka Streams: How To Process a CSV File To Perform Calculations