Что, если в MongoDB приложением сохраняются гигабайты данных, чем обеспечивается его гибкость и масштабируемость, но при их запрашивании и фильтровании все замедляется из-за ограничения MongoDB на полнотекстовый поиск?

Подружим надежное хранилище MongoDB с мощным поиском Elasticsearch, который работает практически в реальном времени. Для этого создадим службу синхронизации данных на Golang, добавив к возможностям хранения MongoDB скорость Elasticsearch.

Чтобы обеспечить обработку высокоскоростных изменений данных и надежную синхронизацию, воспользуемся их инструментами  —  Change Streams и bulk API соответственно. Получится отказоустойчивая, эффективная событийно-ориентированная синхронизация данных, способная справляться с нагрузками уровня продакшена.

Почему именно с Elasticsearch?

MongoDB  —  сама гибкость, но даже с возможностями индексации высокоскоростные, сложные запросы и полнотекстовый поиск, как в Elasticsearch, ей не под силу.

При их синхронизации:

  • MongoDB остается основным хранилищем данных, идеальным для гибкого хранения и CRUD-операций.
  • Elasticsearch обеспечивается выполнение быстрых, релевантных и сложных запросов, совершенствуется пользовательское взаимодействие.

Что понадобится?

  • Четкое понимание Golang и опыт работы с MongoDB и Elasticsearch.
  • По одному экземпляру MongoDB и Elasticsearch, локальному или облачному.
  • Библиотеки: go.mongodb.org/mongo-driver/mongo и github.com/elastic/go-elasticsearch.

Этап 1. Настройка подключений MongoDB и Elasticsearch на Golang

Сначала настроим надежные подключения к MongoDB и Elasticsearch, необходимые для службы производственного уровня. Для ее устойчивости включим повторные попытки и обработку ошибок.

Подключение к MongoDB с автоматическим повтором

package main

import (
"context"
"log"
"time"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

func connectMongoDB(uri string) *mongo.Client {
clientOptions := options.Client().ApplyURI(uri)
client, err := mongo.Connect(context.Background(), clientOptions)
if err != nil {
log.Fatalf("Error connecting to MongoDB: %v", err)
}
return client
}

func main() {
mongoURI := "mongodb://localhost:27017"
client := connectMongoDB(mongoURI)
defer client.Disconnect(context.Background())
}

Подключение Elasticsearch с расширенным логированием ошибок

import (
"log"
"github.com/elastic/go-elasticsearch/v8"
)

func connectElasticsearch() *elasticsearch.Client {
es, err := elasticsearch.NewDefaultClient()
if err != nil {
log.Fatalf("Error connecting to Elasticsearch: %v", err)
}
return es
}

Этап 2. Реализация синхронизации в реальном времени при помощи Change Streams и Bulk API

Теперь самое главное. Для отслеживания всех изменений данных MongoDB воспользуемся Change Streams, а для быстрой и эффективной синхронизации  —  bulk API. Это мощный подход, которым на продакшене обеспечивается высокоскоростная передача данных при синхронизированных базах данных.

Наблюдение за изменениями MongoDB при помощи Change Streams

С Change Streams операции вставки insert, обновления update и удаления delete отслеживаются по мере их выполнения и отправляются прямиком в Elasticsearch. При помощи fullDocument получаем обновленный документ целиком, сокращая время поиска.

import (
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"context"
)

func watchChanges(client *mongo.Client, es *elasticsearch.Client) {
collection := client.Database("production_db").Collection("products")
stream, err := collection.Watch(context.Background(), mongo.Pipeline{})
if err != nil {
log.Fatalf("Error initializing MongoDB Change Stream: %v", err)
}
defer stream.Close(context.Background())

for stream.Next(context.Background()) {
var change bson.M
if err := stream.Decode(&change); err != nil {
log.Printf("Error decoding change event: %v", err)
continue
}

handleSync(change, es) // при необходимости пишем эту функцию
}
}

Синхронизация данных с Elasticsearch массовыми операциями

Получив изменения MongoDB, синхронизируем их с Elasticsearch. Для управления потоком данных создадим три функции  —  indexDocument, updateDocument и deleteDocument. Вот их особенности:

  • Вставка или обновление: при отсутствии документа он добавляется, при наличии  —  обновляется.
  • Удаление: при удалении из MongoDB документ удаляется из Elasticsearch.

Вставка или обновление документа

import (
"bytes"
"encoding/json"
"github.com/elastic/go-elasticsearch/v8/esapi"
)

func indexDocument(change bson.M, es *elasticsearch.Client) {
docID := change["documentKey"].(bson.M)["_id"].(string)
doc := change["fullDocument"]

data, err := json.Marshal(doc)
if err != nil {
log.Printf("Error marshaling document: %v", err)
return
}

req := esapi.IndexRequest{
Index: "products_index",
DocumentID: docID,
Body: bytes.NewReader(data),
Refresh: "true",
}

res, err := req.Do(context.Background(), es)
if err != nil || res.IsError() {
log.Printf("Failed to index document ID=%s", docID)
}
}

Удаление документа

func deleteDocument(change bson.M, es *elasticsearch.Client) {
docID := change["documentKey"].(bson.M)["_id"].(string)

req := esapi.DeleteRequest{
Index: "products_index",
DocumentID: docID,
}

res, err := req.Do(context.Background(), es)
if err != nil || res.IsError() {
log.Printf("Error deleting document ID=%s", docID)
}
}

Этап 3. Повышение производительности массовыми запросами

В сценариях с высокой пропускной способностью используем bulk API и объединяем события в один запрос, снижая нагрузку на сеть и оптимизируя расход ресурсов. Вот пример функции, в которой документы ставятся в очередь и отправляются скопом:

func bulkIndexDocuments(documents []map[string]interface{}, es *elasticsearch.Client) {
var buf bytes.Buffer
for _, doc := range documents {
meta := map[string]interface{}{"index": map[string]string{"_index": "products_index", "_id": doc["_id"].(string)}}
if err := json.NewEncoder(&buf).Encode(meta); err != nil {
log.Printf("Error encoding bulk meta: %v", err)
}
if err := json.NewEncoder(&buf).Encode(doc); err != nil {
log.Printf("Error encoding bulk document: %v", err)
}
}

res, err := es.Bulk(bytes.NewReader(buf.Bytes()), es.Bulk.WithRefresh("true"))
if err != nil || res.IsError() {
log.Printf("Bulk request failed: %v", err)
}
}

Тестирование службы синхронизации

  1. Заполняем MongoDB примером данных и проверяем, что в Change Stream отслеживаются события.
  2. Удостоверяемся, что каждый документ появляется в Elasticsearch уже с последними изменениями данных.

Заключение

Мы сделали мощную событийно-ориентированную службу синхронизации на Golang. Она высокоэффективна и отказоустойчива, MongoDB и Elasticsearch синхронизируются даже при большой нагрузке. Объединив преимущества MongoDB в управлении данными с возможностями поиска Elasticsearch, можно обрабатывать более сложные запросы, не опасаясь потерять в скорости.

На эту основу нанизываются стратегии задержки повторов, оповещения о сбоях и обработка ошибок для службы синхронизации производственного уровня, которая по мере роста данных масштабируется. А теперь вперед  —  развертывайте ее и смотрите, как сразу преобразуются возможности поиска и запросов в приложении.

Читайте также:

Читайте нас в Telegram, VK и Дзен


Перевод статьи Aman Saxena: Real-Time Data Syncing Between MongoDB and Elasticsearch with Golang

Предыдущая статьяКак создать анимацию мерцающего текста в Jetpack Compose
Следующая статья18 полезных приемов написания JavaScript-кода