Введение
Захват изменения данных — это мощная техника выявления и отслеживания изменений в базе данных, эти изменения фиксируются в реальном времени и обрабатываются другими системами. Ею обеспечиваются беспроблемная синхронизация данных, аналитика в режиме реального времени и многое другое. Настроим систему захвата изменения данных с Docker, PostgreSQL, MongoDB, Kafka и Debezium.
Подробное описание каждой службы
1. Zookeeper
Zookeeper — централизованная служба для хранения информации о конфигурации, для именования, распределенной синхронизации и групповых сервисов управления брокерами Kafka.
Удаление ZooKeeper планируется в Apache Kafka 4.0.
Прекращение использования ZooKeeper.
Конфигурация службы Zookeeper
services:
zookeeper1:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper1
networks:
- n1
volumes:
- ./zookeeper1_data:/var/lib/zookeeper/data
- ./zookeeper1_log:/var/lib/zookeeper/log
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: "zookeeper1:22888:23888;zookeeper2:22888:23888"
zookeeper2:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper2
networks:
- n1
volumes:
- ./zookeeper2_data:/var/lib/zookeeper/data
- ./zookeeper2_log:/var/lib/zookeeper/log
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SERVER_ID: 2
ZOOKEEPER_SERVERS: "zookeeper1:22888:23888;zookeeper2:22888:23888"
Преимущества
- Централизованное управление: упрощаются конфигурирование и управление брокерами Kafka.
- Надежность: обеспечиваются высокая доступность и отказоустойчивость.
- Координация: эффективное управление распределенными системами и службами.
Недостатки
- Сложность: требуются тщательная настройка и управление.
- Ресурсоемкость: большой расход системных ресурсов.
2. Kafka
Apache Kafka — распределенная потоковая платформа для обработки потоков данных с высокой пропускной способностью, отказоустойчивостью, низкой задержкой. Применяется обычно при создании конвейеров передачи данных в реальном времени и потоковых приложений.
Ключевые компоненты
- Темы: в Kafka данные организуются по темам — названиям категорий или каналов, где публикуются записи. Для целей параллельной обработки каждая тема разбивается на разделы.
- Отправители: приложения, которыми в темах Kafka публикуются данные.
- Получатели: подписываются на темы, этими приложениями обрабатывается канал публикуемых записей.
- Брокеры: серверы Kafka, где хранятся данные и обслуживаются клиентские запросы. Из брокеров состоит кластер Kafka.
- Zookeeper: для управления брокерами Kafka, их координирования.
Конфигурация службы Kafka
services:
kafka1:
image: confluentinc/cp-kafka:latest
container_name: kafka1
depends_on:
- zookeeper1
- zookeeper2
networks:
- n1
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181,zookeeper2:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2
kafka2:
image: confluentinc/cp-kafka:latest
container_name: kafka2
depends_on:
- zookeeper1
- zookeeper2
networks:
- n1
ports:
- "9093:9092"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181,zookeeper2:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2
Пояснения к конфигурации
Идентификатор брокера Kafka
KAFKA_BROKER_ID
: уникальный идентификатор брокера Kafka в кластере, такой должен быть у каждого брокера.
Подключение Zookeeper
KAFKA_ZOOKEEPER_CONNECT
: указывается строка подключения Zookeeper с перечислением экземпляров Zookeeper, которыми контролируются метаданные Kafka и координируются брокеры.
Прослушиватели
KAFKA_ADVERTISED_LISTENERS
: определяется способ взаимодействия брокеров Kafka с клиентами. Здесь это простой текст через порты 9092 и 9093.
Коэффициенты репликации
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
: задается коэффициент репликации для темы внутренних смещений, которой отслеживаются смещения получателей. Отказоустойчивость повышается более высоким коэффициентом репликации.KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
: задается коэффициент репликации для журнала состояний транзакций. Данные транзакций реплицируются, чем обеспечивается их долговечность.KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
: минимальное количество синхронизированных реплик, необходимых для журнала состояний транзакций. Этим обеспечивается высокая доступность.
Преимущества Kafka
Высокая пропускная способность
- Kafka предназначена для обработки большого количества событий в секунду в сценариях с высокой пропускной способностью: поглощение данных в реальном времени, логирование, потоковая обработка.
Масштабируемость
- Благодаря разбиению на разделы и распределенной архитектуре, Kafka масштабируется горизонтально: добавляются брокеры и разделы, увеличиваются объемы данных и конкурентность.
Долговечность
- В Kafka данные постоянно хранятся на диске и реплицируются в брокерах, этим обеспечиваются долговечность данных и отказоустойчивость. Указывается срок хранения данных, который может быть и неопределенным.
Низкая задержка
- Благодаря низкой задержке при доставке сообщений, Kafka идеальна для приложений реального времени, где требуется немедленная обработка входящих потоков данных.
Гибкость
- В Kafka поддерживаются разные сценарии: агрегация логов, интеграция данных, аналитика в реальном времени, порождение событий. В ее экосистему включаются коннекторы и интеграции с другими системами.
Недостатки Kafka
Сложность
- Управление кластером Kafka — это конфигурирование компонентов, мониторинг, настройка производительности, масштабирование и обновление. Для выполнения сложных задач здесь требуются знания и опыт.
Ресурсоемкость
- Kafka ресурсоемка, здесь немаленький расход ресурсов ЦП, памяти и хранилища, особенно в сценариях с высокой пропускной способностью и крупных развертываниях.
Кривая обучения
- Концепции и конфигурации Kafka сложноваты для новичков. На понимание тем, разделов, репликации и нюансов поведения групп получателей требуется время.
Задержка данных
- Хотя Kafka предназначена для обработки в реальном времени, здесь случаются небольшие задержки или запаздывания данных, обусловленные задержками в сети, нагрузками при обработке данных и конфигурациями получателя.
Зависимость от Zookeeper
- В Kafka кластеры управляются с помощью Zookeeper, и из-за дополнительного компонента сложность увеличивается. Однако в будущих версиях Kafka эту зависимость планируется устранить.
3. Реестр схем
Реестр схем — это централизованное хранилище, в котором схемы Avro управляются и проверяются RESTful-интерфейсом с соблюдением в темах Kafka соответствующего формата отправляемых и получаемых данных. Так поддерживаются совместимость данных и их эволюция.
Ключевые компоненты
- Схемы: ими определяется структура записей данных, схемы Avro — самый популярный формат.
- Субъекты: логические группировки схем, обычно соответствующие темам Kafka.
- Проверки совместимости: обеспечивается совместимость новых версий схемы с предыдущими по определенным правилам совместимости.
Конфигурация службы реестра схем
services:
schema-registry:
image: confluentinc/cp-schema-registry:latest
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper1
- zookeeper2
- kafka1
- kafka2
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper1:2181,zookeeper2:2181"
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:9092,PLAINTEXT://kafka2:9092
networks:
- n1
restart: unless-stopped
healthcheck:
start_period: 10s
interval: 10s
retries: 20
test: curl --user superUser:superUser --fail --silent --insecure http://localhost:8081/subjects --output /dev/null || exit 1
Пояснения к конфигурации
Зависимости
- depends_on: указывается, что запуск и готовность к работе службы реестра схем зависит от служб Zookeeper и Kafka. То есть реестр схем запускается только после того, как становятся доступны его зависимости.
Порты
- 8081:8081: предоставляемый 8081 — порт по умолчанию для RESTful API реестра схем.
Переменные среды́
SCHEMA_REGISTRY_HOST_NAME
: задается имя хоста для службы реестра схем.SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL
: определяется URL-адрес подключения экземпляров Zookeeper, который применяется в реестре схем для управления метаданными Kafka.SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS
: перечисляются серверы начальной загрузки Kafka, к которым подключается реестр схем.
Преимущества реестра схем
Согласованность данных
- Обеспечивается совместимость схем, применяемых отправителями и получателями данных, чем предотвращаются несоответствия и повреждение данных.
Централизованное управление схемами
- Управление и хранение всех схем осуществляются в одном месте, чем упрощаются эволюция и сопровождение схем.
Правила совместимости
- Поддерживаются различные уровни совместимости: обратная, прямая, полная и т. д. Поэтому новыми версиями схем имеющиеся приложения обработки данных не ломаются.
Интеграция с Kafka
- Легко интегрируется с Kafka, обеспечивая проверку схемы и ее применение на уровне темы.
RESTful-интерфейс
- Благодаря RESTful API схемами легко управлять и взаимодействовать с ними, поэтому реестр схем доступен для самых разных приложений и служб.
Недостатки реестра схем
Зависимость
- Приходится контролировать и мониторить дополнительную службу, отчего повышается сложность системы в целом.
Задержка
- Проверка схемы чревата небольшой задержкой при обработке данных, особенно в сценариях с высокой пропускной способностью.
Расход ресурсов
- Для запуска требуются дополнительные ресурсы, что для ресурсоограниченных сред чувствительно.
Сложность эволюции схем
- Сложность управления эволюцией схем и обеспечения совместимости, необходимость тщательного планирования и реализации.
Единая точка отказа
- Если реестр схем управляется механизмами резервирования и аварийного переключения некорректно, он становится единой точкой отказа.
4. Прокси-сервер REST для Kafka
В Kafka REST Proxy имеется RESTful-интерфейс для взаимодействия с кластерами Apache Kafka, с помощью которого в приложениях отправляются и получаются сообщения через HTTP. Особенно кстати приходится он для клиентов, у которых нет встроенной библиотеки Kafka или которым для простоты требуется HTTP-интерфейс.
Ключевые компоненты
- Отправители: приложения, которыми с помощью метода POST HTTP-запросов в темах Kafka публикуются сообщения.
- Получатели: приложения, которые с помощью метода GET HTTP-запросов подписываются на темы Kafka и получают сообщения.
- Темы: категории или каналы, в которых публикуются и получаются сообщения.
- Брокеры: серверы Kafka, где хранятся данные и обрабатываются клиентские запросы.
Конфигурация службы Kafka REST Proxy
services:
rest-proxy:
image: confluentinc/cp-kafka-rest:latest
depends_on:
- zookeeper1
- zookeeper2
- kafka1
- kafka2
- schema-registry
ports:
- "8082:8082"
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: "kafka1:9092,kafka2:9092"
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
networks:
- n1
Пояснения к конфигурации
Зависимости
- depends_on: указывается, что запуск и готовность к работе службы Kafka REST Proxy зависит от служб Zookeeper, Kafka и реестра схем. То есть прокси-сервер REST запускается только после того, как становятся доступны его зависимости.
Порты
- 8082:8082: предоставляемый 8082 — порт по умолчанию для RESTful API прокси-сервера REST в Kafka.
Переменные среды́
KAFKA_REST_HOST_NAME
: задается имя хоста для службы Kafka REST Proxy.KAFKA_REST_BOOTSTRAP_SERVERS
: перечисляются серверы начальной загрузки Kafka, к которым для взаимодействия с кластером Kafka подключается прокси-сервер REST.KAFKA_REST_LISTENERS
: определяются адрес и порт, по которым прокси-сервером REST прослушиваются HTTP-запросы. Здесь через порт 8082 прослушиваются все сетевые интерфейсы0.0.0.0
.
Преимущества Kafka REST Proxy
Простота применения
- Взаимодействие с Kafka по RESTful-интерфейсу через HTTP упрощается, поэтому Kafka REST Proxy доступен для самых разных клиентов и приложений.
Доступность
- В том числе для веб-браузеров, HTTP-служб и т. д., у которых нет встроенной библиотеки Kafka.
Интеграция
- Облегчается интеграция со службами, веб-приложениями и микросервисами RESTful, взаимодействующими по HTTP.
Всеязычность
- Благодаря всеязычному интерфейсу сообщения Kafka отправляются и получаются с помощью стандартных HTTP-методов клиентами, написанными на любом языке.
Упрощенная разработка
- Сложность разработки и развертывания клиентов Kafka снижается за счет упрощения протокола Kafka в рамках простого HTTP API.
Недостатки Kafka REST Proxy
Влияние на производительность
- Взаимодействие по HTTP чревато дополнительными задержкой и накладными расходами по сравнению с собственными клиентами Kafka, что сказывается на производительности в сценариях с высокой пропускной способностью.
Ограниченный функционал
- Поддерживаются не все функции и конфигурации Kafka, доступные для собственных клиентов, например сложное управление группами получателей и расширенные конфигурации.
Безопасность
- Чтобы защитить конечные точки HTTP и передаваемые данные, требуется тщательная проработка мер безопасности: аутентификации, шифрования.
Масштабируемость
- При масштабировании кластеров Kafka и управлении клиентскими HTTP- взаимодействиями у прокси-сервера REST появляются дополнительные точки отказа и сложности.
Зависимость от других служб
- В экосистему Kafka добавляется дополнительный компонент, который требуется мониторить, сопровождать и контролировать.
5. Kafka Connect и debezium
Kafka Connect — это фреймворк для интеграции Kafka с различными источниками и приемниками данных, потоковой передачи данных между Kafka и другими системами вроде баз данных, хранилищ «ключ-значение», поисковых индексов и файловых систем, причем без написания кода. Kafka Connect легко масштабируется и отказоустойчив, поэтому идеален для построения надежных конвейеров данных.
Ключевые компоненты
- Коннекторы: плагины, которыми определяются способы захвата данных из источников или отправки данных в приемники.
- Задачи: единицы работы, которыми выполняется фактическое копирование данных для параллельной обработки.
- Воркеры: процессы виртуальной машины Java, которыми выполняются коннекторы и задачи. Они запускаются в автономном или распределенном режимах.
- Конфигурации: для коннекторов определяются настройки, такие как детали подключения и правила преобразования.
Конфигурация службы Kafka Connect
services:
connect:
image: debezium/connect:latest
restart: always
container_name: connect
depends_on:
- kafka1
- kafka2
networks:
- n1
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
STATUS_STORAGE_TOPIC: connect_statuses
CONFIG_STORAGE_REPLICATION_FACTOR: 2
OFFSET_STORAGE_REPLICATION_FACTOR: 2
STATUS_STORAGE_REPLICATION_FACTOR: 2
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
ENABLE_DEBEZIUM_SCRIPTING: "true"
healthcheck:
test:
[
"CMD",
"curl",
"--silent",
"--fail",
"-X",
"GET",
"http://localhost:8083/connectors",
]
start_period: 10s
interval: 10s
timeout: 5s
retries: 5
Пояснения к конфигурации
Зависимости
- depends_on: указывается, что служба Kafka Connect зависит от брокеров Kafka:
kafka1
иkafka2
. То есть Kafka Connect запускается только после того, как становятся доступны брокеры.
Порты
- 8083:8083: предоставляемый 8083 — порт по умолчанию для REST API-интерфейса Kafka Connect.
Переменные среды́
BOOTSTRAP_SERVERS
: перечисляются серверы начальной загрузки для взаимодействия с Kafka Connect.GROUP_ID
: задается идентификатор группы получателей для Kafka Connect.CONFIG_STORAGE_TOPIC
: тема, в которой хранятся конфигурации коннекторов.OFFSET_STORAGE_TOPIC
: тема, в которой хранятся смещения, отслеживается прогресс каждого коннектора.STATUS_STORAGE_TOPIC
: тема, в которой хранятся статусы коннекторов и задач.CONFIG_STORAGE_REPLICATION_FACTOR
,OFFSET_STORAGE_REPLICATION_FACTOR
,STATUS_STORAGE_REPLICATION_FACTOR
: задаются коэффициенты репликации соответствующих тем для обеспечения отказоустойчивости.CONNECT_KEY_CONVERTER
иCONNECT_VALUE_CONVERTER
: указываются преобразователи ключей и значений. Здесь применяетсяJsonConverter
.CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE
иCONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE
: поддержка схем для преобразователей отключается в пользу обычного JSON.KEY_CONVERTER
иVALUE_CONVERTER
: задаются преобразователи ключей и значений.ENABLE_DEBEZIUM_SCRIPTING
: включается скриптовая поддержка коннекторов Debezium для пользовательских преобразований.
Преимущества Kafka Connect
Расширяемость
- Для источников и приемников данных поддерживаются самые разные коннекторы, поэтому Kafka Connect — сверхгибкая служба, применимая в различных сценариях.
Масштабируемость
- Легко масштабируется добавлением рабочих узлов для обработки, в том числе параллельной, увеличенных объемов данных.
Отказоустойчивость
- Встроенная отказоустойчивость обеспечивается репликацией тем — конфигураций, смещений и статусов.
Оптимизированные конвейеры данных
- Упрощается процесс настройки конвейеров данных и управления ими, уменьшается потребность в пользовательском ETL-коде извлечения, преобразования, загрузки данных.
Интеграция
- Легко интегрируется с экосистемой, пользуясь масштабируемостью, отказоустойчивостью и долговечностью Kafka.
Недостатки Kafka Connect
Сложная конфигурация
- Каждому коннектору и задаче требуется детальная конфигурация, подчас сложная и подверженная ошибкам.
Ресурсоемкость
- Расходуются значительные ресурсы, особенно в сценариях с высокой пропускной способностью, поэтому нужно аккуратно выделять и контролировать ресурсы.
Кривая обучения
- Требуется понимание архитектуры и конфигураций Kafka Connect, для новичков это сложно.
Накладные расходы
- Увеличиваются накладные расходы в связи с контролем и мониторингом коннекторов и задач, особенно при крупных развертываниях.
Задержка
- Хотя Kafka Connect предназначена для обработки в реальном времени, здесь случаются небольшие задержки, обусловленные конфигурацией и состояниями сети.
6. Сервер ksqlDB
ksqlDB — это потоковый движок на Kafka Streams. С помощью SQL им создаются приложения обработки и анализа данных в реальном времени, потоковая обработка осуществляется в удобном SQL-интерфейсе. Благодаря ksqlDB, в привычном синтаксисе SQL определяются потоковые преобразования, агрегации, объединения, работа с окнами и многое другое.
Ключевые компоненты
- Потоки: неограниченные, непрерывные данные, поступающие из темы Kafka.
- Таблицы: агрегированные представления потоков с сохранением состояния или другие таблицы.
- Запросы: SQL-инструкции, которыми определяются способы обработки, преобразования и анализа данных.
Пример:
-- Создаем поток с необработанными данными
CREATE STREAM raw_data (
before STRUCT<id INT, value STRING>,
after STRUCT<id INT, value STRING>,
source STRUCT<version STRING, connector STRING, name STRING, ts_ms BIGINT, snapshot STRING, db STRING, sequence ARRAY<STRING>, schema STRING, table STRING, txId INT, lsn BIGINT, xmin INT>,
op STRING,
ts_ms BIGINT,
transaction STRUCT<id STRING, total_order BIGINT, data_collection_order BIGINT>
)
WITH (KAFKA_TOPIC='<topic name>', VALUE_FORMAT='JSON');
-- Создаем поток с преобразованными данными
CREATE STREAM transformed_data AS
SELECT after->id AS id,
UCASE(after->value) AS value
FROM raw_data
WHERE after IS NOT NULL
EMIT CHANGES;
Конфигурация службы сервера ksqlDB
services:
ksqldb-server:
image: confluentinc/cp-ksqldb-server:latest
depends_on:
- kafka1
- kafka2
networks:
- n1
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksqldb"
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_HOST_NAME: ksqldb-server
KSQL_CONNECT_URL: "http://connect:8083"
Пояснения к конфигурации
Зависимости
- depends_on: указывается, что сервер ksqlDB зависит от брокеров Kafka:
kafka1
иkafka2
. То есть сервер ksqlDB запускается только после того, как становятся доступны брокеры.
Порты
- 8088:8088: предоставляемый 8088 — порт по умолчанию для REST API-интерфейса сервера ksqlDB.
Переменные среды́
KSQL_CONFIG_DIR
: указывается каталог, в котором хранятся файлы конфигурации ksqlDB.KSQL_LISTENERS
: определяются адрес и порт, по которым сервером ksqlDB прослушиваются HTTP-запросы. Здесь через порт 8088 прослушиваются все сетевые интерфейсы0.0.0.0
.KSQL_BOOTSTRAP_SERVERS
: перечисляются серверы начальной загрузки Kafka, к которым подключается сервер ksqlDB для взаимодействия с кластером Kafka.KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE
иKSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE
: автоматически создаются потоки и темы обработки логирования.KSQL_KSQL_SCHEMA_REGISTRY_URL
: указывается url-адрес реестра схем для проверки схем и управления ими.KSQL_HOST_NAME
: задается имя хоста для сервера ksqlDB.KSQL_CONNECT_URL
: указывается url-адрес Kafka Connect для взаимодействия с сервером ksqlDB в целях интеграции.
Преимущества ksqlDB
SQL-интерфейс
- Потоковая обработка осуществляется в привычном синтаксисе SQL, поэтому ksqlDB доступен пользователям, знакомым с SQL.
Обработка в режиме реального времени
- Обеспечиваются обработка и анализ данных в реальном времени с мгновенными результатами и действиями на основе потоковых данных.
Интеграция с Kafka
- Легко интегрируется с Kafka, пользуясь ее масштабируемостью, отказоустойчивостью, долговечностью.
Операции с сохранением состояния
- Поддерживается потоковая обработка с сохранением состояния — агрегации, объединения, работа с окнами и многое другое — для сложных преобразований данных.
Простота применения
- Упрощается разработка приложений потоковой обработки, нужно меньше сложного кода и пользовательских решений.
Масштабируемость
- Масштабируется горизонтально добавлением экземпляров сервера ksqlDB с распределением нагрузки при обработке данных.
Недостатки ksqlDB
Кривая обучения
- Требуется понимание специфичных для ksqlDB синтаксиса SQL и концепций, для новичков это сложно.
Расход ресурсов
- ksqlDB ресурсоемок, нужно аккуратно выделять и контролировать ресурсы, особенно для приложений с высокой пропускной способностью.
Задержка
- Хотя ksqlDB предназначен для обработки в реальном времени, здесь случаются небольшие задержки, обусловленные сложностью запросов и объемами данных.
Сложность эксплуатации
- Увеличиваются накладные расходы в связи с контролем и мониторингом экземпляров сервера ksqlDB и запросов, в частности при крупных развертываниях.
Ограничения функционала
- В ksqlDB поддерживается функционал потоковой обработки, но с ограничениями — в отличие от пользовательских решений потоковой обработки, созданных с помощью фреймворков вроде Kafka Streams.
Графические ПИ
Пользовательский интерфейс Debezium
ПИ Debezium — это графический пользовательский интерфейс для управления коннекторами Debezium. Им упрощаются конфигурация, развертывание, мониторинг коннекторов для захвата изменения данных, потоковая передача данных из различных БД в Kafka.
Альтернативы
- Kafka Connect UI: старый веб-интерфейс с открытым исходным кодом для контроля и мониторинга коннекторов Kafka Connect.
- Confluent Control Center: комплексный корпоративный инструмент мониторинга и контроля коннекторов и кластеров Kafka, имеется также поддержка Debezium.
AKHQ
AKHQ (бывший KafkaHQ) — это Open Source веб-интерфейс для мониторинга и контроля кластеров Apache Kafka с помощью функционала вроде проверки тем, управления группами получателей, просмотра данных в реальном времени.
Альтернативы
- Kafka Manager: этим веб-инструментом с открытым исходным кодом для управления кластерами Kafka предоставляется информация о статистике брокеров, темах и разделах.
- Confluent Control Center: решение корпоративного уровня для мониторинга и управления Kafka с расширенным функционалом: комплексным мониторингом, клиентскими метриками производительности и другими.
Пользовательский интерфейс ksqlDB
ПИ ksqlDB доступен через Confluent Control Center, взаимодействие с ksqlDB осуществляется с помощью графического интерфейса. В этом ПИ пользователи пишут и выполняют запросы к ksqlDB, управляют потоками и таблицами, в реальном времени визуализируют потоки данных. Процесс разработки приложений потоковой обработки и управления ими в ПИ ksqlDB упрощается таким функционалом, как встроенное автодополнение, проверка схем, визуализация запросов.
Альтернативы
- Confluent Control Center: всеобъемлющий корпоративный ПИ для управления ksqlDB вместе с другими компонентами экосистемы Kafka — это надежный инструментарий для разработки, мониторинга и масштабирования приложений потоковой обработки.
- DBVisualizer — это Open Source и коммерческий универсальный инструмент баз данных, подключаемый к различным БД, с продвинутым выполнением SQL-запросов, хотя и не предназначен специально для потоковой обработки ksqlDB.
- DBeaver — это Open Source и коммерческий, интуитивно понятный, полнофункциональный графический интерфейс для управления базами данных, выполнения SQL-запросов, визуализации данных и разработки схем БД. Он славится универсальностью, развитой архитектурой плагинов, поддержкой разнообразных систем баз данных, поэтому пользуется популярностью среди разработчиков и администраторов БД.
- DataGrip: коммерческая интегрированная среда разработки баз данных от JetBrains с поддержкой нескольких БД, расширенными возможностями запросов и управления базами данных, но не предназначенная специально для потоковой обработки Kafka.
Службы вместе
Захват изменения данных
networks:
n1:
external: true
services:
zookeeper1:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper1
networks:
- n1
volumes:
- ./zookeeper1_data:/var/lib/zookeeper/data
- ./zookeeper1_log:/var/lib/zookeeper/log
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: "zookeeper1:22888:23888;zookeeper2:22888:23888"
zookeeper2:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper2
networks:
- n1
volumes:
- ./zookeeper2_data:/var/lib/zookeeper/data
- ./zookeeper2_log:/var/lib/zookeeper/log
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SERVER_ID: 2
ZOOKEEPER_SERVERS: "zookeeper1:22888:23888;zookeeper2:22888:23888"
kafka1:
image: confluentinc/cp-kafka:latest
container_name: kafka1
depends_on:
- zookeeper1
- zookeeper2
networks:
- n1
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181,zookeeper2:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2
kafka2:
image: confluentinc/cp-kafka:latest
container_name: kafka2
depends_on:
- zookeeper1
- zookeeper2
networks:
- n1
ports:
- "9093:9092"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181,zookeeper2:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2
schema-registry:
image: confluentinc/cp-schema-registry:latest
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper1
- zookeeper2
- kafka1
- kafka2
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper1:2181,zookeeper2:2181"
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:9092,PLAINTEXT://kafka2:9092
networks:
- n1
restart: unless-stopped
healthcheck:
start_period: 10s
interval: 10s
retries: 20
test: curl --user superUser:superUser --fail --silent --insecure http://localhost:8081/subjects --output /dev/null || exit 1
rest-proxy:
image: confluentinc/cp-kafka-rest:7.3.1
depends_on:
- zookeeper1
- zookeeper2
- kafka1
- kafka2
- schema-registry
ports:
- "8082:8082"
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: "kafka1:9092,kafka2:9092"
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
networks:
- n1
connect:
image: debezium/connect:latest
restart: always
container_name: connect
depends_on:
- kafka1
- kafka2
networks:
- n1
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
STATUS_STORAGE_TOPIC: connect_statuses
CONFIG_STORAGE_REPLICATION_FACTOR: 2
OFFSET_STORAGE_REPLICATION_FACTOR: 2
STATUS_STORAGE_REPLICATION_FACTOR: 2
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
ENABLE_DEBEZIUM_SCRIPTING: "true"
healthcheck:
test:
[
"CMD",
"curl",
"--silent",
"--fail",
"-X",
"GET",
"http://localhost:8083/connectors",
]
start_period: 10s
interval: 10s
timeout: 5s
retries: 5
ksqldb-server:
image: confluentinc/cp-ksqldb-server:latest
depends_on:
- kafka1
- kafka2
networks:
- n1
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksqldb"
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_HOST_NAME: ksqldb-server
KSQL_CONNECT_URL: "http://connect:8083"
akhq:
image: tchiotludo/akhq:latest
depends_on:
- kafka1
- kafka2
networks:
- n1
ports:
- "8080:8080"
environment:
AKHQ_CONFIGURATION: |
akhq:
connections:
kafka-cluster:
properties:
bootstrap.servers: "kafka1:9092,kafka2:9092"
schema-registry:
url: "http://schema-registry:8081"
connect:
- name: "connect"
url: "http://connect:8083"
ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:latest # использование -> docker-compose exec ksqldb-cli ksql http://ksqldb-server:8088
depends_on:
- ksqldb-server
volumes:
- ./sql:/tmp
networks:
- n1
entrypoint: /bin/sh
tty: true
debezium-ui:
image: debezium/debezium-ui:latest
restart: always
container_name: debezium-ui
hostname: debezium-ui
networks:
- n1
depends_on:
- connect
ports:
- '8181:8080'
environment:
KAFKA_CONNECT_URIS: http://connect:8083
Базы данных
networks:
n1:
external: true
services:
db:
image: postgres:latest
container_name: sdb
restart: always
ports:
- "5432:5432"
environment:
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
- POSTGRES_USER=${POSTGRES_USER}
- POSTGRES_DB=${POSTGRES_DB}
command: ["postgres", "-c", "wal_level=logical"]
volumes:
- ./sdb/data:/var/lib/postgresql/data
networks:
- n1
healthcheck:
test: ["CMD", "psql", "-U", "postgres", "-c", "SELECT 1"]
interval: 10s
timeout: 5s
retries: 5
logging:
driver: "json-file"
options:
max-size: "2048m"
mongodb:
image: mongo:latest
container_name: mongodb
restart: always
volumes:
- ./mongodb_data:/data/db
- ./mongo-configs/rs-initiate.js:/docker-entrypoint-initdb.d/rs-initiate.js
ports:
- 27017:27017
command: ["mongod","--replSet", "rs0", "--bind_ip_all"]
networks:
- n1
Конфигурации для PostgreSQL и MongoDB
Конфигурация PostgreSQL: чтобы активировать захват изменения данных в PostgreSQL, уровню журнала предзаписи WAL базы данных задается значение logical
. Так захватываются детализированные изменения в БД на логическом уровне, а не просто физические изменения. Эта конфигурация нужна для инструментов вроде Debezium, где потоковая передача изменений из PostgreSQL в Kafka выполняется посредством логического декодирования.
Конфигурация MongoDB: осуществляется набором реплик для захвата изменения данных. Набор реплик в MongoDB — это группа экземпляров mongod, которыми поддерживается одинаковый набор данных для обеспечения резервирования и высокой доступности. Набор реплик включается для захвата потоков изменений, используемых инструментами захвата изменения данных вроде Debezium для мониторинга и потоковой передачи изменений в базе данных в реальном времени:
rs.initiate({
_id: 'rs0',
members: [
{ _id: 0, host: 'mongodb:27017' }
]
});
Пример с коннекторами
PostgreSQL:
В этом случае необходимо выполнить запрос методом post в:
curl --location 'http://<service-domain>:8083/connectors' \
--header 'Content-Type: application/json' \
--data '{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"tasks.max": "1",
"database.hostname": "db",
"database.port": "5432",
"database.user": "root",
"database.password": "1",
"database.dbname": "sport-db",
"database.server.name": "dbserver1",
"database.history.kafka.bootstrap.servers": "kafka1:9092;kafka2:9092",
"schema.include": "public",
"topic.prefix": "dbserver1.",
"topic.creation.enable": "true",
"topic.creation.default.replication.factor": "1",
"topic.creation.default.partitions": "1",
"topic.creation.default.cleanup.policy": "delete",
"topic.creation.default.retention.ms": "604800000"
}
}
'
Mongodb:
curl --location 'http://<service-domain>:8083/connectors' \
--header 'Content-Type: application/json' \
--data '{
"name": "inventory-connector",
"config": {
"connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max" : "1",
"topic.prefix" : "dbserver1",
"mongodb.connection.string" : "mongodb://mongodb:27017/?authSource=admin",
"database.include.list" : "events",
"schema.history.internal.kafka.bootstrap.servers" : "kafka1:9092,kafka2:9092",
"transforms": "route",
"transforms.route.type" : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex" : "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement" : "$3"
}
}'
Заключение
Реализация системы захвата изменения данных с Docker, PostgreSQL, MongoDB, Kafka и Debezium — надежное и гибкое решение для синхронизации и анализа данных в реальном времени. Мы описали настройку и конфигурацию каждого компонента, выделив их роли, преимущества и особенности.
- ZooKeeper — это координация и отказоустойчивость кластеров Kafka посредством централизованного управления.
- Kafka — высокая пропускная способность, масштабируемость и долговечность, необходимые для конвейеров передачи данных в реальном времени.
- Реестр схем — централизованная проверка схем и управление ими, обеспечение согласованности и совместимости данных.
- Kafka REST Proxy — упрощение взаимодействий с Kafka по HTTP, поэтому он доступен для самых разных клиентов.
- Kafka Connect — облегчение интеграции с различными источниками и приемниками данных, поддержка масштабируемых и отказоустойчивых конвейеров данных.
- Сервер ksqlDB — SQL-интерфейс для потоковой обработки в реальном времени и сложных преобразований данных с использованием SQL.
Корректно настроив эти компоненты, в организациях обеспечивают интеграцию и обработку данных реального времени с мгновенными результатами и действиями на основе последних данных. Правильная конфигурация и постоянное управление — ключевые условия поддержания эффективного и надежного конвейера для захвата изменения данных.
Читайте также:
- Система инженерии данных «от и до» с Kafka, Spark, Airflow, Postgres и Docker. Часть 1
- Как упростить работу с базами данных в Golang с SQLX
- Как создать платформу обработки и анализа данных за неделю
Читайте нас в Telegram, VK и Дзен
Перевод статьи Ashkan Golehpour: Implementing Change Data Capture (CDC) with Docker, PostgreSQL, MongoDB, Kafka, and Debezium: A Comprehensive Guide