Что, если в MongoDB приложением сохраняются гигабайты данных, чем обеспечивается его гибкость и масштабируемость, но при их запрашивании и фильтровании все замедляется из-за ограничения MongoDB на полнотекстовый поиск?
Подружим надежное хранилище MongoDB с мощным поиском Elasticsearch, который работает практически в реальном времени. Для этого создадим службу синхронизации данных на Golang, добавив к возможностям хранения MongoDB скорость Elasticsearch.
Чтобы обеспечить обработку высокоскоростных изменений данных и надежную синхронизацию, воспользуемся их инструментами — Change Streams и bulk API соответственно. Получится отказоустойчивая, эффективная событийно-ориентированная синхронизация данных, способная справляться с нагрузками уровня продакшена.
Почему именно с Elasticsearch?
MongoDB — сама гибкость, но даже с возможностями индексации высокоскоростные, сложные запросы и полнотекстовый поиск, как в Elasticsearch, ей не под силу.
При их синхронизации:
- MongoDB остается основным хранилищем данных, идеальным для гибкого хранения и CRUD-операций.
- Elasticsearch обеспечивается выполнение быстрых, релевантных и сложных запросов, совершенствуется пользовательское взаимодействие.
Что понадобится?
- Четкое понимание Golang и опыт работы с MongoDB и Elasticsearch.
- По одному экземпляру MongoDB и Elasticsearch, локальному или облачному.
- Библиотеки:
go.mongodb.org/mongo-driver/mongo
иgithub.com/elastic/go-elasticsearch
.
Этап 1. Настройка подключений MongoDB и Elasticsearch на Golang
Сначала настроим надежные подключения к MongoDB и Elasticsearch, необходимые для службы производственного уровня. Для ее устойчивости включим повторные попытки и обработку ошибок.
Подключение к MongoDB с автоматическим повтором
package main
import (
"context"
"log"
"time"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func connectMongoDB(uri string) *mongo.Client {
clientOptions := options.Client().ApplyURI(uri)
client, err := mongo.Connect(context.Background(), clientOptions)
if err != nil {
log.Fatalf("Error connecting to MongoDB: %v", err)
}
return client
}
func main() {
mongoURI := "mongodb://localhost:27017"
client := connectMongoDB(mongoURI)
defer client.Disconnect(context.Background())
}
Подключение Elasticsearch с расширенным логированием ошибок
import (
"log"
"github.com/elastic/go-elasticsearch/v8"
)
func connectElasticsearch() *elasticsearch.Client {
es, err := elasticsearch.NewDefaultClient()
if err != nil {
log.Fatalf("Error connecting to Elasticsearch: %v", err)
}
return es
}
Этап 2. Реализация синхронизации в реальном времени при помощи Change Streams и Bulk API
Теперь самое главное. Для отслеживания всех изменений данных MongoDB воспользуемся Change Streams, а для быстрой и эффективной синхронизации — bulk API. Это мощный подход, которым на продакшене обеспечивается высокоскоростная передача данных при синхронизированных базах данных.
Наблюдение за изменениями MongoDB при помощи Change Streams
С Change Streams операции вставки insert
, обновления update
и удаления delete
отслеживаются по мере их выполнения и отправляются прямиком в Elasticsearch. При помощи fullDocument
получаем обновленный документ целиком, сокращая время поиска.
import (
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"context"
)
func watchChanges(client *mongo.Client, es *elasticsearch.Client) {
collection := client.Database("production_db").Collection("products")
stream, err := collection.Watch(context.Background(), mongo.Pipeline{})
if err != nil {
log.Fatalf("Error initializing MongoDB Change Stream: %v", err)
}
defer stream.Close(context.Background())
for stream.Next(context.Background()) {
var change bson.M
if err := stream.Decode(&change); err != nil {
log.Printf("Error decoding change event: %v", err)
continue
}
handleSync(change, es) // при необходимости пишем эту функцию
}
}
Синхронизация данных с Elasticsearch массовыми операциями
Получив изменения MongoDB, синхронизируем их с Elasticsearch. Для управления потоком данных создадим три функции — indexDocument
, updateDocument
и deleteDocument
. Вот их особенности:
- Вставка или обновление: при отсутствии документа он добавляется, при наличии — обновляется.
- Удаление: при удалении из MongoDB документ удаляется из Elasticsearch.
Вставка или обновление документа
import (
"bytes"
"encoding/json"
"github.com/elastic/go-elasticsearch/v8/esapi"
)
func indexDocument(change bson.M, es *elasticsearch.Client) {
docID := change["documentKey"].(bson.M)["_id"].(string)
doc := change["fullDocument"]
data, err := json.Marshal(doc)
if err != nil {
log.Printf("Error marshaling document: %v", err)
return
}
req := esapi.IndexRequest{
Index: "products_index",
DocumentID: docID,
Body: bytes.NewReader(data),
Refresh: "true",
}
res, err := req.Do(context.Background(), es)
if err != nil || res.IsError() {
log.Printf("Failed to index document ID=%s", docID)
}
}
Удаление документа
func deleteDocument(change bson.M, es *elasticsearch.Client) {
docID := change["documentKey"].(bson.M)["_id"].(string)
req := esapi.DeleteRequest{
Index: "products_index",
DocumentID: docID,
}
res, err := req.Do(context.Background(), es)
if err != nil || res.IsError() {
log.Printf("Error deleting document ID=%s", docID)
}
}
Этап 3. Повышение производительности массовыми запросами
В сценариях с высокой пропускной способностью используем bulk API и объединяем события в один запрос, снижая нагрузку на сеть и оптимизируя расход ресурсов. Вот пример функции, в которой документы ставятся в очередь и отправляются скопом:
func bulkIndexDocuments(documents []map[string]interface{}, es *elasticsearch.Client) {
var buf bytes.Buffer
for _, doc := range documents {
meta := map[string]interface{}{"index": map[string]string{"_index": "products_index", "_id": doc["_id"].(string)}}
if err := json.NewEncoder(&buf).Encode(meta); err != nil {
log.Printf("Error encoding bulk meta: %v", err)
}
if err := json.NewEncoder(&buf).Encode(doc); err != nil {
log.Printf("Error encoding bulk document: %v", err)
}
}
res, err := es.Bulk(bytes.NewReader(buf.Bytes()), es.Bulk.WithRefresh("true"))
if err != nil || res.IsError() {
log.Printf("Bulk request failed: %v", err)
}
}
Тестирование службы синхронизации
- Заполняем MongoDB примером данных и проверяем, что в Change Stream отслеживаются события.
- Удостоверяемся, что каждый документ появляется в Elasticsearch уже с последними изменениями данных.
Заключение
Мы сделали мощную событийно-ориентированную службу синхронизации на Golang. Она высокоэффективна и отказоустойчива, MongoDB и Elasticsearch синхронизируются даже при большой нагрузке. Объединив преимущества MongoDB в управлении данными с возможностями поиска Elasticsearch, можно обрабатывать более сложные запросы, не опасаясь потерять в скорости.
На эту основу нанизываются стратегии задержки повторов, оповещения о сбоях и обработка ошибок для службы синхронизации производственного уровня, которая по мере роста данных масштабируется. А теперь вперед — развертывайте ее и смотрите, как сразу преобразуются возможности поиска и запросов в приложении.
Читайте также:
- Проекты Go: создаем собственное канареечное развертывание
- Как реализовать в Golang двухфакторную аутентификацию с TOTP
- Что такое «мьютекс» и как им предотвращаются состояния гонки?
Читайте нас в Telegram, VK и Дзен
Перевод статьи Aman Saxena: Real-Time Data Syncing Between MongoDB and Elasticsearch with Golang