Введение

Захват изменения данных  —  это мощная техника выявления и отслеживания изменений в базе данных, эти изменения фиксируются в реальном времени и обрабатываются другими системами. Ею обеспечиваются беспроблемная синхронизация данных, аналитика в режиме реального времени и многое другое. Настроим систему захвата изменения данных с Docker, PostgreSQL, MongoDB, Kafka и Debezium.

Подробное описание каждой службы

1. Zookeeper

Zookeeper  —  централизованная служба для хранения информации о конфигурации, для именования, распределенной синхронизации и групповых сервисов управления брокерами Kafka.

Удаление ZooKeeper планируется в Apache Kafka 4.0. 
Прекращение использования ZooKeeper.

Конфигурация службы Zookeeper

services:
zookeeper1:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper1
networks:
- n1
volumes:
- ./zookeeper1_data:/var/lib/zookeeper/data
- ./zookeeper1_log:/var/lib/zookeeper/log
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: "zookeeper1:22888:23888;zookeeper2:22888:23888"

zookeeper2:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper2
networks:
- n1
volumes:
- ./zookeeper2_data:/var/lib/zookeeper/data
- ./zookeeper2_log:/var/lib/zookeeper/log
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SERVER_ID: 2
ZOOKEEPER_SERVERS: "zookeeper1:22888:23888;zookeeper2:22888:23888"

Преимущества

  • Централизованное управление: упрощаются конфигурирование и управление брокерами Kafka.
  • Надежность: обеспечиваются высокая доступность и отказоустойчивость.
  • Координация: эффективное управление распределенными системами и службами.

Недостатки

  • Сложность: требуются тщательная настройка и управление.
  • Ресурсоемкость: большой расход системных ресурсов.

2. Kafka

Apache Kafka  —  распределенная потоковая платформа для обработки потоков данных с высокой пропускной способностью, отказоустойчивостью, низкой задержкой. Применяется обычно при создании конвейеров передачи данных в реальном времени и потоковых приложений.

Ключевые компоненты

  • Темы: в Kafka данные организуются по темам  —  названиям категорий или каналов, где публикуются записи. Для целей параллельной обработки каждая тема разбивается на разделы.
  • Отправители: приложения, которыми в темах Kafka публикуются данные.
  • Получатели: подписываются на темы, этими приложениями обрабатывается канал публикуемых записей.
  • Брокеры: серверы Kafka, где хранятся данные и обслуживаются клиентские запросы. Из брокеров состоит кластер Kafka.
  • Zookeeper: для управления брокерами Kafka, их координирования.

Конфигурация службы Kafka

services:
kafka1:
image: confluentinc/cp-kafka:latest
container_name: kafka1
depends_on:
- zookeeper1
- zookeeper2
networks:
- n1
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181,zookeeper2:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2

kafka2:
image: confluentinc/cp-kafka:latest
container_name: kafka2
depends_on:
- zookeeper1
- zookeeper2
networks:
- n1
ports:
- "9093:9092"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181,zookeeper2:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2

Пояснения к конфигурации

Идентификатор брокера Kafka

  • KAFKA_BROKER_ID: уникальный идентификатор брокера Kafka в кластере, такой должен быть у каждого брокера.

Подключение Zookeeper

  • KAFKA_ZOOKEEPER_CONNECT: указывается строка подключения Zookeeper с перечислением экземпляров Zookeeper, которыми контролируются метаданные Kafka и координируются брокеры.

Прослушиватели

  • KAFKA_ADVERTISED_LISTENERS: определяется способ взаимодействия брокеров Kafka с клиентами. Здесь это простой текст через порты 9092 и 9093.

Коэффициенты репликации

  • KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: задается коэффициент репликации для темы внутренних смещений, которой отслеживаются смещения получателей. Отказоустойчивость повышается более высоким коэффициентом репликации.
  • KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: задается коэффициент репликации для журнала состояний транзакций. Данные транзакций реплицируются, чем обеспечивается их долговечность.
  • KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: минимальное количество синхронизированных реплик, необходимых для журнала состояний транзакций. Этим обеспечивается высокая доступность.

Преимущества Kafka

Высокая пропускная способность

  • Kafka предназначена для обработки большого количества событий в секунду в сценариях с высокой пропускной способностью: поглощение данных в реальном времени, логирование, потоковая обработка.

Масштабируемость

  • Благодаря разбиению на разделы и распределенной архитектуре, Kafka масштабируется горизонтально: добавляются брокеры и разделы, увеличиваются объемы данных и конкурентность.

Долговечность

  • В Kafka данные постоянно хранятся на диске и реплицируются в брокерах, этим обеспечиваются долговечность данных и отказоустойчивость. Указывается срок хранения данных, который может быть и неопределенным.

Низкая задержка

  • Благодаря низкой задержке при доставке сообщений, Kafka идеальна для приложений реального времени, где требуется немедленная обработка входящих потоков данных.

Гибкость

  • В Kafka поддерживаются разные сценарии: агрегация логов, интеграция данных, аналитика в реальном времени, порождение событий. В ее экосистему включаются коннекторы и интеграции с другими системами.

Недостатки Kafka

Сложность

  • Управление кластером Kafka  —  это конфигурирование компонентов, мониторинг, настройка производительности, масштабирование и обновление. Для выполнения сложных задач здесь требуются знания и опыт.

Ресурсоемкость

  • Kafka ресурсоемка, здесь немаленький расход ресурсов ЦП, памяти и хранилища, особенно в сценариях с высокой пропускной способностью и крупных развертываниях.

Кривая обучения

  • Концепции и конфигурации Kafka сложноваты для новичков. На понимание тем, разделов, репликации и нюансов поведения групп получателей требуется время.

Задержка данных

  • Хотя Kafka предназначена для обработки в реальном времени, здесь случаются небольшие задержки или запаздывания данных, обусловленные задержками в сети, нагрузками при обработке данных и конфигурациями получателя.

Зависимость от Zookeeper

  • В Kafka кластеры управляются с помощью Zookeeper, и из-за дополнительного компонента сложность увеличивается. Однако в будущих версиях Kafka эту зависимость планируется устранить.

3. Реестр схем

Реестр схем  —  это централизованное хранилище, в котором схемы Avro управляются и проверяются RESTful-интерфейсом с соблюдением в темах Kafka соответствующего формата отправляемых и получаемых данных. Так поддерживаются совместимость данных и их эволюция.

Ключевые компоненты

  • Схемы: ими определяется структура записей данных, схемы Avro  —  самый популярный формат.
  • Субъекты: логические группировки схем, обычно соответствующие темам Kafka.
  • Проверки совместимости: обеспечивается совместимость новых версий схемы с предыдущими по определенным правилам совместимости.

Конфигурация службы реестра схем

services:
schema-registry:
image: confluentinc/cp-schema-registry:latest
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper1
- zookeeper2
- kafka1
- kafka2
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper1:2181,zookeeper2:2181"
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:9092,PLAINTEXT://kafka2:9092
networks:
- n1
restart: unless-stopped
healthcheck:
start_period: 10s
interval: 10s
retries: 20
test: curl --user superUser:superUser --fail --silent --insecure http://localhost:8081/subjects --output /dev/null || exit 1

Пояснения к конфигурации

Зависимости

  • depends_on: указывается, что запуск и готовность к работе службы реестра схем зависит от служб Zookeeper и Kafka. То есть реестр схем запускается только после того, как становятся доступны его зависимости.

Порты

  • 8081:8081: предоставляемый 8081  —  порт по умолчанию для RESTful API реестра схем.

Переменные среды́

  • SCHEMA_REGISTRY_HOST_NAME: задается имя хоста для службы реестра схем.
  • SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: определяется URL-адрес подключения экземпляров Zookeeper, который применяется в реестре схем для управления метаданными Kafka.
  • SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: перечисляются серверы начальной загрузки Kafka, к которым подключается реестр схем.

Преимущества реестра схем

Согласованность данных

  • Обеспечивается совместимость схем, применяемых отправителями и получателями данных, чем предотвращаются несоответствия и повреждение данных.

Централизованное управление схемами

  • Управление и хранение всех схем осуществляются в одном месте, чем упрощаются эволюция и сопровождение схем.

Правила совместимости

  • Поддерживаются различные уровни совместимости: обратная, прямая, полная и т. д. Поэтому новыми версиями схем имеющиеся приложения обработки данных не ломаются.

Интеграция с Kafka

  • Легко интегрируется с Kafka, обеспечивая проверку схемы и ее применение на уровне темы.

RESTful-интерфейс

  • Благодаря RESTful API схемами легко управлять и взаимодействовать с ними, поэтому реестр схем доступен для самых разных приложений и служб.

Недостатки реестра схем

Зависимость

  • Приходится контролировать и мониторить дополнительную службу, отчего повышается сложность системы в целом.

Задержка

  • Проверка схемы чревата небольшой задержкой при обработке данных, особенно в сценариях с высокой пропускной способностью.

Расход ресурсов

  • Для запуска требуются дополнительные ресурсы, что для ресурсоограниченных сред чувствительно.

Сложность эволюции схем

  • Сложность управления эволюцией схем и обеспечения совместимости, необходимость тщательного планирования и реализации.

Единая точка отказа

  • Если реестр схем управляется механизмами резервирования и аварийного переключения некорректно, он становится единой точкой отказа.

4. Прокси-сервер REST для Kafka

В Kafka REST Proxy имеется RESTful-интерфейс для взаимодействия с кластерами Apache Kafka, с помощью которого в приложениях отправляются и получаются сообщения через HTTP. Особенно кстати приходится он для клиентов, у которых нет встроенной библиотеки Kafka или которым для простоты требуется HTTP-интерфейс.

Ключевые компоненты

  • Отправители: приложения, которыми с помощью метода POST HTTP-запросов в темах Kafka публикуются сообщения.
  • Получатели: приложения, которые с помощью метода GET HTTP-запросов подписываются на темы Kafka и получают сообщения.
  • Темы: категории или каналы, в которых публикуются и получаются сообщения.
  • Брокеры: серверы Kafka, где хранятся данные и обрабатываются клиентские запросы.

Конфигурация службы Kafka REST Proxy

services:
rest-proxy:
image: confluentinc/cp-kafka-rest:latest
depends_on:
- zookeeper1
- zookeeper2
- kafka1
- kafka2
- schema-registry
ports:
- "8082:8082"
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: "kafka1:9092,kafka2:9092"
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
networks:
- n1

Пояснения к конфигурации

Зависимости

  • depends_on: указывается, что запуск и готовность к работе службы Kafka REST Proxy зависит от служб Zookeeper, Kafka и реестра схем. То есть прокси-сервер REST запускается только после того, как становятся доступны его зависимости.

Порты

  • 8082:8082: предоставляемый 8082  —  порт по умолчанию для RESTful API прокси-сервера REST в Kafka.

Переменные среды́

  • KAFKA_REST_HOST_NAME: задается имя хоста для службы Kafka REST Proxy.
  • KAFKA_REST_BOOTSTRAP_SERVERS: перечисляются серверы начальной загрузки Kafka, к которым для взаимодействия с кластером Kafka подключается прокси-сервер REST.
  • KAFKA_REST_LISTENERS: определяются адрес и порт, по которым прокси-сервером REST прослушиваются HTTP-запросы. Здесь через порт 8082 прослушиваются все сетевые интерфейсы 0.0.0.0.

Преимущества Kafka REST Proxy

Простота применения

  • Взаимодействие с Kafka по RESTful-интерфейсу через HTTP упрощается, поэтому Kafka REST Proxy доступен для самых разных клиентов и приложений.

Доступность

  • В том числе для веб-браузеров, HTTP-служб и т. д., у которых нет встроенной библиотеки Kafka.

Интеграция

  • Облегчается интеграция со службами, веб-приложениями и микросервисами RESTful, взаимодействующими по HTTP.

Всеязычность

  • Благодаря всеязычному интерфейсу сообщения Kafka отправляются и получаются с помощью стандартных HTTP-методов клиентами, написанными на любом языке.

Упрощенная разработка

  • Сложность разработки и развертывания клиентов Kafka снижается за счет упрощения протокола Kafka в рамках простого HTTP API.

Недостатки Kafka REST Proxy

Влияние на производительность

  • Взаимодействие по HTTP чревато дополнительными задержкой и накладными расходами по сравнению с собственными клиентами Kafka, что сказывается на производительности в сценариях с высокой пропускной способностью.

Ограниченный функционал

  • Поддерживаются не все функции и конфигурации Kafka, доступные для собственных клиентов, например сложное управление группами получателей и расширенные конфигурации.

Безопасность

  • Чтобы защитить конечные точки HTTP и передаваемые данные, требуется тщательная проработка мер безопасности: аутентификации, шифрования.

Масштабируемость

  • При масштабировании кластеров Kafka и управлении клиентскими HTTP- взаимодействиями у прокси-сервера REST появляются дополнительные точки отказа и сложности.

Зависимость от других служб

  • В экосистему Kafka добавляется дополнительный компонент, который требуется мониторить, сопровождать и контролировать.

5. Kafka Connect и debezium

Kafka Connect  —  это фреймворк для интеграции Kafka с различными источниками и приемниками данных, потоковой передачи данных между Kafka и другими системами вроде баз данных, хранилищ «ключ-значение», поисковых индексов и файловых систем, причем без написания кода. Kafka Connect легко масштабируется и отказоустойчив, поэтому идеален для построения надежных конвейеров данных.

Ключевые компоненты

  • Коннекторы: плагины, которыми определяются способы захвата данных из источников или отправки данных в приемники.
  • Задачи: единицы работы, которыми выполняется фактическое копирование данных для параллельной обработки.
  • Воркеры: процессы виртуальной машины Java, которыми выполняются коннекторы и задачи. Они запускаются в автономном или распределенном режимах.
  • Конфигурации: для коннекторов определяются настройки, такие как детали подключения и правила преобразования.

Конфигурация службы Kafka Connect

services:
connect:
image: debezium/connect:latest
restart: always
container_name: connect
depends_on:
- kafka1
- kafka2
networks:
- n1
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
STATUS_STORAGE_TOPIC: connect_statuses
CONFIG_STORAGE_REPLICATION_FACTOR: 2
OFFSET_STORAGE_REPLICATION_FACTOR: 2
STATUS_STORAGE_REPLICATION_FACTOR: 2
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
ENABLE_DEBEZIUM_SCRIPTING: "true"
healthcheck:
test:
[
"CMD",
"curl",
"--silent",
"--fail",
"-X",
"GET",
"http://localhost:8083/connectors",
]
start_period: 10s
interval: 10s
timeout: 5s
retries: 5

Пояснения к конфигурации

Зависимости

  • depends_on: указывается, что служба Kafka Connect зависит от брокеров Kafka: kafka1 и kafka2. То есть Kafka Connect запускается только после того, как становятся доступны брокеры.

Порты

  • 8083:8083: предоставляемый 8083  —  порт по умолчанию для REST API-интерфейса Kafka Connect.

Переменные среды́

  • BOOTSTRAP_SERVERS: перечисляются серверы начальной загрузки для взаимодействия с Kafka Connect.
  • GROUP_ID: задается идентификатор группы получателей для Kafka Connect.
  • CONFIG_STORAGE_TOPIC: тема, в которой хранятся конфигурации коннекторов.
  • OFFSET_STORAGE_TOPIC: тема, в которой хранятся смещения, отслеживается прогресс каждого коннектора.
  • STATUS_STORAGE_TOPIC: тема, в которой хранятся статусы коннекторов и задач.
  • CONFIG_STORAGE_REPLICATION_FACTOR, OFFSET_STORAGE_REPLICATION_FACTOR, STATUS_STORAGE_REPLICATION_FACTOR: задаются коэффициенты репликации соответствующих тем для обеспечения отказоустойчивости.
  • CONNECT_KEY_CONVERTER и CONNECT_VALUE_CONVERTER: указываются преобразователи ключей и значений. Здесь применяется JsonConverter.
  • CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE и CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: поддержка схем для преобразователей отключается в пользу обычного JSON.
  • KEY_CONVERTER и VALUE_CONVERTER: задаются преобразователи ключей и значений.
  • ENABLE_DEBEZIUM_SCRIPTING: включается скриптовая поддержка коннекторов Debezium для пользовательских преобразований.

Преимущества Kafka Connect

Расширяемость

  • Для источников и приемников данных поддерживаются самые разные коннекторы, поэтому Kafka Connect  —  сверхгибкая служба, применимая в различных сценариях.

Масштабируемость

  • Легко масштабируется добавлением рабочих узлов для обработки, в том числе параллельной, увеличенных объемов данных.

Отказоустойчивость

  • Встроенная отказоустойчивость обеспечивается репликацией тем  —  конфигураций, смещений и статусов.

Оптимизированные конвейеры данных

  • Упрощается процесс настройки конвейеров данных и управления ими, уменьшается потребность в пользовательском ETL-коде извлечения, преобразования, загрузки данных.

Интеграция

  • Легко интегрируется с экосистемой, пользуясь масштабируемостью, отказоустойчивостью и долговечностью Kafka.

Недостатки Kafka Connect

Сложная конфигурация

  • Каждому коннектору и задаче требуется детальная конфигурация, подчас сложная и подверженная ошибкам.

Ресурсоемкость

  • Расходуются значительные ресурсы, особенно в сценариях с высокой пропускной способностью, поэтому нужно аккуратно выделять и контролировать ресурсы.

Кривая обучения

  • Требуется понимание архитектуры и конфигураций Kafka Connect, для новичков это сложно.

Накладные расходы

  • Увеличиваются накладные расходы в связи с контролем и мониторингом коннекторов и задач, особенно при крупных развертываниях.

Задержка

  • Хотя Kafka Connect предназначена для обработки в реальном времени, здесь случаются небольшие задержки, обусловленные конфигурацией и состояниями сети.

6. Сервер ksqlDB

ksqlDB  —  это потоковый движок на Kafka Streams. С помощью SQL им создаются приложения обработки и анализа данных в реальном времени, потоковая обработка осуществляется в удобном SQL-интерфейсе. Благодаря ksqlDB, в привычном синтаксисе SQL определяются потоковые преобразования, агрегации, объединения, работа с окнами и многое другое.

Ключевые компоненты

  • Потоки: неограниченные, непрерывные данные, поступающие из темы Kafka.
  • Таблицы: агрегированные представления потоков с сохранением состояния или другие таблицы.
  • Запросы: SQL-инструкции, которыми определяются способы обработки, преобразования и анализа данных.

Пример:

-- Создаем поток с необработанными данными
CREATE STREAM raw_data (
before STRUCT<id INT, value STRING>,
after STRUCT<id INT, value STRING>,
source STRUCT<version STRING, connector STRING, name STRING, ts_ms BIGINT, snapshot STRING, db STRING, sequence ARRAY<STRING>, schema STRING, table STRING, txId INT, lsn BIGINT, xmin INT>,
op STRING,
ts_ms BIGINT,
transaction STRUCT<id STRING, total_order BIGINT, data_collection_order BIGINT>
)
WITH (KAFKA_TOPIC='<topic name>', VALUE_FORMAT='JSON');


-- Создаем поток с преобразованными данными
CREATE STREAM transformed_data AS
SELECT after->id AS id,
UCASE(after->value) AS value
FROM raw_data
WHERE after IS NOT NULL
EMIT CHANGES;

Конфигурация службы сервера ksqlDB

services:
ksqldb-server:
image: confluentinc/cp-ksqldb-server:latest
depends_on:
- kafka1
- kafka2
networks:
- n1
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksqldb"
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_HOST_NAME: ksqldb-server
KSQL_CONNECT_URL: "http://connect:8083"

Пояснения к конфигурации

Зависимости

  • depends_on: указывается, что сервер ksqlDB зависит от брокеров Kafka: kafka1 и kafka2. То есть сервер ksqlDB запускается только после того, как становятся доступны брокеры.

Порты

  • 8088:8088: предоставляемый 8088  —  порт по умолчанию для REST API-интерфейса сервера ksqlDB.

Переменные среды́

  • KSQL_CONFIG_DIR: указывается каталог, в котором хранятся файлы конфигурации ksqlDB.
  • KSQL_LISTENERS: определяются адрес и порт, по которым сервером ksqlDB прослушиваются HTTP-запросы. Здесь через порт 8088 прослушиваются все сетевые интерфейсы 0.0.0.0.
  • KSQL_BOOTSTRAP_SERVERS: перечисляются серверы начальной загрузки Kafka, к которым подключается сервер ksqlDB для взаимодействия с кластером Kafka.
  • KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE и KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: автоматически создаются потоки и темы обработки логирования.
  • KSQL_KSQL_SCHEMA_REGISTRY_URL: указывается url-адрес реестра схем для проверки схем и управления ими.
  • KSQL_HOST_NAME: задается имя хоста для сервера ksqlDB.
  • KSQL_CONNECT_URL: указывается url-адрес Kafka Connect для взаимодействия с сервером ksqlDB в целях интеграции.

Преимущества ksqlDB

SQL-интерфейс

  • Потоковая обработка осуществляется в привычном синтаксисе SQL, поэтому ksqlDB доступен пользователям, знакомым с SQL.

Обработка в режиме реального времени

  • Обеспечиваются обработка и анализ данных в реальном времени с мгновенными результатами и действиями на основе потоковых данных.

Интеграция с Kafka

  • Легко интегрируется с Kafka, пользуясь ее масштабируемостью, отказоустойчивостью, долговечностью.

Операции с сохранением состояния

  • Поддерживается потоковая обработка с сохранением состояния  —  агрегации, объединения, работа с окнами и многое другое  —  для сложных преобразований данных.

Простота применения

  • Упрощается разработка приложений потоковой обработки, нужно меньше сложного кода и пользовательских решений.

Масштабируемость

  • Масштабируется горизонтально добавлением экземпляров сервера ksqlDB с распределением нагрузки при обработке данных.

Недостатки ksqlDB

Кривая обучения

  • Требуется понимание специфичных для ksqlDB синтаксиса SQL и концепций, для новичков это сложно.

Расход ресурсов

  • ksqlDB ресурсоемок, нужно аккуратно выделять и контролировать ресурсы, особенно для приложений с высокой пропускной способностью.

Задержка

  • Хотя ksqlDB предназначен для обработки в реальном времени, здесь случаются небольшие задержки, обусловленные сложностью запросов и объемами данных.

Сложность эксплуатации

  • Увеличиваются накладные расходы в связи с контролем и мониторингом экземпляров сервера ksqlDB и запросов, в частности при крупных развертываниях.

Ограничения функционала

  • В ksqlDB поддерживается функционал потоковой обработки, но с ограничениями  —  в отличие от пользовательских решений потоковой обработки, созданных с помощью фреймворков вроде Kafka Streams.

Графические ПИ

Пользовательский интерфейс Debezium

ПИ Debezium  —  это графический пользовательский интерфейс для управления коннекторами Debezium. Им упрощаются конфигурация, развертывание, мониторинг коннекторов для захвата изменения данных, потоковая передача данных из различных БД в Kafka.

Альтернативы

  • Kafka Connect UI: старый веб-интерфейс с открытым исходным кодом для контроля и мониторинга коннекторов Kafka Connect.
  • Confluent Control Center: комплексный корпоративный инструмент мониторинга и контроля коннекторов и кластеров Kafka, имеется также поддержка Debezium.

AKHQ

AKHQ (бывший KafkaHQ)  —  это Open Source веб-интерфейс для мониторинга и контроля кластеров Apache Kafka с помощью функционала вроде проверки тем, управления группами получателей, просмотра данных в реальном времени.

Альтернативы

  • Kafka Manager: этим веб-инструментом с открытым исходным кодом для управления кластерами Kafka предоставляется информация о статистике брокеров, темах и разделах.
  • Confluent Control Center: решение корпоративного уровня для мониторинга и управления Kafka с расширенным функционалом: комплексным мониторингом, клиентскими метриками производительности и другими.

kadeck , kafka-ui, kafdrop.

Пользовательский интерфейс ksqlDB

ПИ ksqlDB доступен через Confluent Control Center, взаимодействие с ksqlDB осуществляется с помощью графического интерфейса. В этом ПИ пользователи пишут и выполняют запросы к ksqlDB, управляют потоками и таблицами, в реальном времени визуализируют потоки данных. Процесс разработки приложений потоковой обработки и управления ими в ПИ ksqlDB упрощается таким функционалом, как встроенное автодополнение, проверка схем, визуализация запросов.

Альтернативы

  • Confluent Control Center: всеобъемлющий корпоративный ПИ для управления ksqlDB вместе с другими компонентами экосистемы Kafka  —  это надежный инструментарий для разработки, мониторинга и масштабирования приложений потоковой обработки.
  • DBVisualizer  —  это Open Source и коммерческий универсальный инструмент баз данных, подключаемый к различным БД, с продвинутым выполнением SQL-запросов, хотя и не предназначен специально для потоковой обработки ksqlDB.
  • DBeaver  —  это Open Source и коммерческий, интуитивно понятный, полнофункциональный графический интерфейс для управления базами данных, выполнения SQL-запросов, визуализации данных и разработки схем БД. Он славится универсальностью, развитой архитектурой плагинов, поддержкой разнообразных систем баз данных, поэтому пользуется популярностью среди разработчиков и администраторов БД.
  • DataGrip: коммерческая интегрированная среда разработки баз данных от JetBrains с поддержкой нескольких БД, расширенными возможностями запросов и управления базами данных, но не предназначенная специально для потоковой обработки Kafka.

Службы вместе

Захват изменения данных

networks:
n1:
external: true

services:
zookeeper1:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper1
networks:
- n1
volumes:
- ./zookeeper1_data:/var/lib/zookeeper/data
- ./zookeeper1_log:/var/lib/zookeeper/log
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: "zookeeper1:22888:23888;zookeeper2:22888:23888"

zookeeper2:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper2
networks:
- n1
volumes:
- ./zookeeper2_data:/var/lib/zookeeper/data
- ./zookeeper2_log:/var/lib/zookeeper/log
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SERVER_ID: 2
ZOOKEEPER_SERVERS: "zookeeper1:22888:23888;zookeeper2:22888:23888"
kafka1:
image: confluentinc/cp-kafka:latest
container_name: kafka1
depends_on:
- zookeeper1
- zookeeper2
networks:
- n1
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181,zookeeper2:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2

kafka2:
image: confluentinc/cp-kafka:latest
container_name: kafka2
depends_on:
- zookeeper1
- zookeeper2
networks:
- n1
ports:
- "9093:9092"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181,zookeeper2:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2
schema-registry:
image: confluentinc/cp-schema-registry:latest
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper1
- zookeeper2
- kafka1
- kafka2
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper1:2181,zookeeper2:2181"
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:9092,PLAINTEXT://kafka2:9092
networks:
- n1
restart: unless-stopped
healthcheck:
start_period: 10s
interval: 10s
retries: 20
test: curl --user superUser:superUser --fail --silent --insecure http://localhost:8081/subjects --output /dev/null || exit 1

rest-proxy:
image: confluentinc/cp-kafka-rest:7.3.1
depends_on:
- zookeeper1
- zookeeper2
- kafka1
- kafka2
- schema-registry
ports:
- "8082:8082"
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: "kafka1:9092,kafka2:9092"
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
networks:
- n1
connect:
image: debezium/connect:latest
restart: always
container_name: connect
depends_on:
- kafka1
- kafka2
networks:
- n1
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
STATUS_STORAGE_TOPIC: connect_statuses
CONFIG_STORAGE_REPLICATION_FACTOR: 2
OFFSET_STORAGE_REPLICATION_FACTOR: 2
STATUS_STORAGE_REPLICATION_FACTOR: 2
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
ENABLE_DEBEZIUM_SCRIPTING: "true"
healthcheck:
test:
[
"CMD",
"curl",
"--silent",
"--fail",
"-X",
"GET",
"http://localhost:8083/connectors",
]
start_period: 10s
interval: 10s
timeout: 5s
retries: 5

ksqldb-server:
image: confluentinc/cp-ksqldb-server:latest
depends_on:
- kafka1
- kafka2
networks:
- n1
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksqldb"
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_HOST_NAME: ksqldb-server
KSQL_CONNECT_URL: "http://connect:8083"

akhq:
image: tchiotludo/akhq:latest
depends_on:
- kafka1
- kafka2
networks:
- n1
ports:
- "8080:8080"
environment:
AKHQ_CONFIGURATION: |
akhq:
connections:
kafka-cluster:
properties:
bootstrap.servers: "kafka1:9092,kafka2:9092"
schema-registry:
url: "http://schema-registry:8081"
connect:
- name: "connect"
url: "http://connect:8083"
ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:latest # использование -> docker-compose exec ksqldb-cli ksql http://ksqldb-server:8088
depends_on:
- ksqldb-server
volumes:
- ./sql:/tmp
networks:
- n1
entrypoint: /bin/sh
tty: true
debezium-ui:
image: debezium/debezium-ui:latest
restart: always
container_name: debezium-ui
hostname: debezium-ui
networks:
- n1
depends_on:
- connect
ports:
- '8181:8080'
environment:
KAFKA_CONNECT_URIS: http://connect:8083

Базы данных

networks:
n1:
external: true

services:
db:
image: postgres:latest
container_name: sdb
restart: always
ports:
- "5432:5432"
environment:
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
- POSTGRES_USER=${POSTGRES_USER}
- POSTGRES_DB=${POSTGRES_DB}
command: ["postgres", "-c", "wal_level=logical"]
volumes:
- ./sdb/data:/var/lib/postgresql/data
networks:
- n1
healthcheck:
test: ["CMD", "psql", "-U", "postgres", "-c", "SELECT 1"]
interval: 10s
timeout: 5s
retries: 5
logging:
driver: "json-file"
options:
max-size: "2048m"
mongodb:
image: mongo:latest
container_name: mongodb
restart: always
volumes:
- ./mongodb_data:/data/db
- ./mongo-configs/rs-initiate.js:/docker-entrypoint-initdb.d/rs-initiate.js
ports:
- 27017:27017
command: ["mongod","--replSet", "rs0", "--bind_ip_all"]
networks:
- n1

Конфигурации для PostgreSQL и MongoDB

Конфигурация PostgreSQL: чтобы активировать захват изменения данных в PostgreSQL, уровню журнала предзаписи WAL базы данных задается значение logical. Так захватываются детализированные изменения в БД на логическом уровне, а не просто физические изменения. Эта конфигурация нужна для инструментов вроде Debezium, где потоковая передача изменений из PostgreSQL в Kafka выполняется посредством логического декодирования.

Конфигурация MongoDB: осуществляется набором реплик для захвата изменения данных. Набор реплик в MongoDB  —  это группа экземпляров mongod, которыми поддерживается одинаковый набор данных для обеспечения резервирования и высокой доступности. Набор реплик включается для захвата потоков изменений, используемых инструментами захвата изменения данных вроде Debezium для мониторинга и потоковой передачи изменений в базе данных в реальном времени:

rs.initiate({
_id: 'rs0',
members: [
{ _id: 0, host: 'mongodb:27017' }
]
});

Пример с коннекторами

PostgreSQL:

В этом случае необходимо выполнить запрос методом post в:

curl --location 'http://<service-domain>:8083/connectors' \
--header 'Content-Type: application/json' \
--data '{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"tasks.max": "1",
"database.hostname": "db",
"database.port": "5432",
"database.user": "root",
"database.password": "1",
"database.dbname": "sport-db",
"database.server.name": "dbserver1",
"database.history.kafka.bootstrap.servers": "kafka1:9092;kafka2:9092",
"schema.include": "public",

"topic.prefix": "dbserver1.",
"topic.creation.enable": "true",
"topic.creation.default.replication.factor": "1",
"topic.creation.default.partitions": "1",
"topic.creation.default.cleanup.policy": "delete",
"topic.creation.default.retention.ms": "604800000"
}
}
'

Mongodb:

curl --location 'http://<service-domain>:8083/connectors' \
--header 'Content-Type: application/json' \
--data '{
"name": "inventory-connector",
"config": {
"connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max" : "1",
"topic.prefix" : "dbserver1",
"mongodb.connection.string" : "mongodb://mongodb:27017/?authSource=admin",
"database.include.list" : "events",
"schema.history.internal.kafka.bootstrap.servers" : "kafka1:9092,kafka2:9092",
"transforms": "route",
"transforms.route.type" : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex" : "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement" : "$3"
}
}'

Заключение

Реализация системы захвата изменения данных с Docker, PostgreSQL, MongoDB, Kafka и Debezium  —  надежное и гибкое решение для синхронизации и анализа данных в реальном времени. Мы описали настройку и конфигурацию каждого компонента, выделив их роли, преимущества и особенности.

  • ZooKeeper  —  это координация и отказоустойчивость кластеров Kafka посредством централизованного управления.
  • Kafka  —  высокая пропускная способность, масштабируемость и долговечность, необходимые для конвейеров передачи данных в реальном времени.
  • Реестр схем  —  централизованная проверка схем и управление ими, обеспечение согласованности и совместимости данных.
  • Kafka REST Proxy  — упрощение взаимодействий с Kafka по HTTP, поэтому он доступен для самых разных клиентов.
  • Kafka Connect  —  облегчение интеграции с различными источниками и приемниками данных, поддержка масштабируемых и отказоустойчивых конвейеров данных.
  • Сервер ksqlDB  —  SQL-интерфейс для потоковой обработки в реальном времени и сложных преобразований данных с использованием SQL.

Корректно настроив эти компоненты, в организациях обеспечивают интеграцию и обработку данных реального времени с мгновенными результатами и действиями на основе последних данных. Правильная конфигурация и постоянное управление  —  ключевые условия поддержания эффективного и надежного конвейера для захвата изменения данных.

Читайте также:

Читайте нас в Telegram, VK и Дзен


Перевод статьи Ashkan Golehpour: Implementing Change Data Capture (CDC) with Docker, PostgreSQL, MongoDB, Kafka, and Debezium: A Comprehensive Guide

Предыдущая статьяШаблон Sidecar-контейнера с Kubernetes и Go