В процессе разработки микросервисных приложений часто необходимо наладить эффективную и быструю коммуникацию между сервисами. Разработанный Google gRPC предоставляет высокопроизводительный фреймворк для организации такого взаимодействия. Однако стандартные балансировщики нагрузки в gRPC не всегда удовлетворяют специфическим требованиям, особенно когда требуется приоритизация адресов для минимизации сетевых задержек и обеспечения отказоустойчивости.

В этой статье я поделюсь опытом создания кастомного балансировщика нагрузки на Go для gRPC, который использует приоритеты адресов для выбора наилучшего соединения. Это решение позволяет гибко управлять распределением клиентских запросов между серверами с разными уровнями доступности и обеспечивает подключение к оптимальному ЦОД с минимальными задержками.

Постановка задачи

При разработке одного из проектов VK Tech мне потребовалось реализовать балансировщик, который выбирает первый доступный адрес из приоритетного списка. Приоритеты адресов определяются порядком в конфигурационном файле: чем выше адрес в списке, тем выше его приоритет. В случае недоступности адреса с наивысшим приоритетом балансировщик должен автоматически переключаться на следующий доступный адрес по приоритету.

Требования к балансировщику:

  • Приоритизация адресов: выбор адреса с наивысшим приоритетом из списка.
  • Отказоустойчивость: автоматическое переключение на следующий адрес при недоступности текущего.
  • Минимизация задержек: подключение к ближайшему или наиболее оптимальному ЦОД.

Почему стандартные балансировщики не подходят

Стандартные балансировщики в gRPC, такие как round-robin (циклический) и pick-first («первый доступный»), не учитывают приоритизацию адресов в списке.

Round-robin равномерно распределяет запросы между всеми доступными серверами, что может привести к увеличению сетевых задержек, если некоторые серверы географически удалены или менее производительны.

Pick-first всегда выбирает первый доступный адрес, но не переключается на адреса с более высоким приоритетом, если они становятся доступными после первоначального подключения.

Таким образом, для решения задачи минимизации задержек и обеспечения гибкости подключения к различным ЦОДам стандартные балансировщики не подходят.

Основная идея кастомного балансировщика

Наш кастомный балансировщик использует приоритизацию адресов, заданную в конфигурационном файле, для выбора наилучшего соединения.

  • Порядок адресов: адреса упорядочены по приоритету; индекс 0 — наивысший приоритет.
  • Выбор соединения: всегда выбирается первое доступное соединение с наивысшим приоритетом.
  • Автоматическое переключение: при недоступности текущего соединения балансировщик переключается на следующий по приоритету.

Преимущества такого подхода:

  • Минимизация сетевых задержек.
  • Повышенная отказоустойчивость.
  • Гибкость настройки.

Обзор архитектуры решения

Перед тем как перейти к реализации, рассмотрим основные компоненты нашего балансировщика и их взаимодействие.

BalancerBuilder

Балансировщик в gRPC создаётся с помощью билдера. Наш BalancerBuilder регистрирует балансировщик с определённым именем и схемой, чтобы gRPC-клиент мог его использовать.

type BalancerBuilder struct{}

func (b BalancerBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
return &Balancer{
cc: cc,
subConns: resolver.NewAddressMap(),
scStates: make(map[balancer.SubConn]connectivity.State),
csEvltr: &balancer.ConnectivityStateEvaluator{},
state: connectivity.Connecting,
}
}

func (b BalancerBuilder) Name() string { return balancerName }

func init() {
balancer.Register(&BalancerBuilder{})
}

Основные задачи билдера:

  • Создание и инициализация балансировщика.
  • Настройка взаимодействия с ClientConn.
  • Регистрация балансировщика для использования клиентом.

Resolver

Резолвер предоставляет балансировщику список адресов с их приоритетами. Он преобразует адреса из конфигурационного файла в resolver.Address, присваивая каждому адресу атрибут index, соответствующий его приоритету.

type resolverBuilder struct {
addresses []resolver.Address
}

func (b *resolverBuilder) Build(
target resolver.Target,
clientConn resolver.ClientConn,
_ resolver.BuildOptions,
) (resolver.Resolver, error) {
ctx, cancel := context.WithCancel(context.Background())

res := &fiResolver{
ctx: ctx,
cancel: cancel,
target: target,
cc: clientConn,
addressesStore: b.addresses,
}

if len(b.addresses) > 1 {
res.serviceConfig = clientConn.ParseServiceConfig(defaultConfig)
}

go res.start()

return res, nil
}

func (*resolverBuilder) Scheme() string {
return scheme
}

func initResolver(addresses []string) {
addressesStore := make([]resolver.Address, len(addresses))
for i, addr := range addresses {
addressesStore[i] = resolver.Address{
Addr: addr,
Attributes: attributes.New("index", i),
}
}

resolver.Register(&resolverBuilder{addresses: addressesStore})
}

Функции резолвера:

  • Динамическое обновление адресов.
  • Предоставление адресов с приоритетами балансировщику.
  • Сообщение об ошибках в случае недоступности адресов.

Picker

Picker выбирает соединение с наименьшим индексом (наивысшим приоритетом) из доступных. Если соединение с более высоким приоритетом становится доступным, балансировщик автоматически переключается на него.

type firstIdxPicker struct {
result balancer.PickResult
err error
}

func (p *firstIdxPicker) Pick(_ balancer.PickInfo) (balancer.PickResult, error) {
return p.result, p.err
}

func NewFIPicker(info base.PickerBuildInfo) balancer.Picker {
if len(info.ReadySCs) == 0 {
return &firstIdxPicker{err: balancer.ErrNoSubConnAvailable}
}

minIdx := math.MaxInt
var selectedConn balancer.SubConn

for sc, scInfo := range info.ReadySCs {
idx, ok := scInfo.Address.Attributes.Value("index").(int) // <- наш простенький алгоритм определения оптимального соединения
if ok && idx < minIdx {
minIdx = idx
selectedConn = sc
}
}

if selectedConn != nil {
return &firstIdxPicker{result: balancer.PickResult{SubConn: selectedConn}}
}

return &firstIdxPicker{err: balancer.ErrNoSubConnAvailable}
}

Алгоритм выбора:

  1. Проходит по всем готовым соединениям.
  2. Выбирает соединение с наименьшим index.
  3. Возвращает выбранное соединение для обработки запроса.

Balancer

Балансировщик отслеживает состояния соединений и регенерирует Picker при их изменении.

type Balancer struct {
cc balancer.ClientConn
csEvltr *balancer.ConnectivityStateEvaluator
state connectivity.State

subConns *resolver.AddressMap
scStates map[balancer.SubConn]connectivity.State
picker balancer.Picker

resolverErr error
connErr error
}

func (b *Balancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
b.resolverErr = nil

addressMap := b.createNewSubConnections(ccs)

for _, addr := range b.subConns.Keys() {
if _, ok := addressMap.Get(addr); !ok {
sci, _ := b.subConns.Get(addr)
sc := sci.(balancer.SubConn)
sc.Shutdown()
b.subConns.Delete(addr)
}
}

if len(ccs.ResolverState.Addresses) == 0 {
b.ResolverError(errZeroAddresses)
return balancer.ErrBadResolverState
}

b.regeneratePicker()
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})

return nil
}

func (b *Balancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
oldState, ok := b.scStates[subConn]
if !ok {
return
}

b.scStates[subConn] = state.ConnectivityState

switch state.ConnectivityState {
case connectivity.Idle:
subConn.Connect()
case connectivity.Shutdown:
delete(b.scStates, subConn)
case connectivity.TransientFailure:
b.connErr = state.ConnectionError
}

b.state = b.csEvltr.RecordTransition(oldState, state.ConnectivityState)

if (state.ConnectivityState == connectivity.Ready) != (oldState == connectivity.Ready) || b.state == connectivity.TransientFailure {
b.regeneratePicker()
}

b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
}

func (b *Balancer) regeneratePicker() {
if b.state == connectivity.TransientFailure {
b.picker = &firstIdxPicker{err: errors.Join(b.resolverErr, b.connErr)}
return
}

readySCs := make(map[balancer.SubConn]base.SubConnInfo)

for _, addr := range b.subConns.Keys() {
sci, _ := b.subConns.Get(addr)
sc := sci.(balancer.SubConn)
if state, ok := b.scStates[sc]; ok && state == connectivity.Ready {
readySCs[sc] = base.SubConnInfo{Address: addr}
}
}

b.picker = NewFIPicker(base.PickerBuildInfo{ReadySCs: readySCs})
}

Отслеживание состояний соединений:

  • UpdateClientConnState: создание новых и удаление неактуальных соединений.
  • UpdateSubConnState: обновление состояний существующих соединений.
  • regeneratePicker: обновление пикера при изменении состояний для выбора оптимального соединения.

Настройка и конфигурация

Для использования кастомного балансировщика необходимо определить его имя и схему, а также настроить подключение.

const (
scheme = "scheme-name"
balancerName = "pick_idx_first"
defaultConfig = `{"loadBalancingConfig": [{"pick_idx_first": {}}]}`
retryTimeout = time.Millisecond * 100
maxRetries = 10
)

type ConnOptions struct {
Addrs []string
Opts []grpc.DialOption
}

func NewConn(ctx context.Context, connOptions ConnOptions) (*grpc.ClientConn, error) {
conn, err := dialContext(ctx, connOptions.Addrs, connOptions.Opts...)
if err != nil {
return nil, fmt.Errorf("unable to initialize conn: %w", err)
}

return conn, nil
}

func dialContext(ctx context.Context, addresses []string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {

...
opts = append(opts,
grpc.WithDefaultServiceConfig(defaultConfig),
grpc.WithStreamInterceptor(
retry.StreamClientInterceptor(retry.WithPerRetryTimeout(retryTimeout), retry.WithMax(maxRetries)),
),
grpc.WithUnaryInterceptor(
retry.UnaryClientInterceptor(retry.WithPerRetryTimeout(retryTimeout), retry.WithMax(maxRetries)),
),
)
...

initResolver(addresses)

return grpc.DialContext(ctx, fmt.Sprintf("%s:///", scheme), opts...)

Параметры подключения:

  • scheme и balancerName: определяют кастомный балансировщик.
  • defaultConfig: задаёт конфигурацию балансировки.
  • Интерсепторы: добавлены для повторных подключений при кратковременных сбоях. Я использовал пакет github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry.

Тестирование и результаты

В процессе тестирования балансировщик показал стабильную работу при переключении между адресами в случае недоступности сервера с более высоким приоритетом. Задержки были минимизированы благодаря приоритетному подключению к ближайшему ЦОДу.

Заключение

Создание кастомного gRPC-балансировщика с приоритизацией адресов позволяет более точно контролировать распределение клиентских запросов и улучшить производительность приложения. Такое решение обеспечивает гибкость настройки, минимизацию сетевых задержек и повышенную отказоустойчивость, что особенно важно в современных микросервисных архитектурах.

Преимущества кастомного решения:

  • Гибкость: настройка приоритетов адресов.
  • Эффективность: минимизация задержек за счёт выбора оптимального соединения.
  • Отказоустойчивость: автоматическое переключение при недоступности сервера.

Перспективы развития:

  • Динамическое обновление приоритетов.
  • Интеграция с сервисами обнаружения.
  • Расширение логики выбора на основе метрик производительности.

Надеюсь, эта статья поможет вам в создании кастомных решений для ваших gRPC-приложений. 

Ссылки и дополнительные материалы

Читайте также:

Читайте нас в Telegram, VK и Дзен


Статья доступна на Хабре kuvatovrr: Создание кастомного балансировщика нагрузки на Go для gRPC с приоритизацией адресов

Предыдущая статьяОсновные языки веб-разработки