Дирижируйте горутинами с помощью каналов

Go получил известность во многом благодаря своему чистому и эффективному подходу к параллельному выполнению. С применением горутин можно добиться огромного повышения эффективности, выполняя несколько потоков в фоновом режиме во время работы основной программы. Но как обеспечить взаимодействие горутин или наладить совместное использование ими ресурсов? Вы будете удивлены простой, но мощной природой каналов в Go. Будьте к этому готовы.

Предполагается, что кое-какие знания о горутинах у вас имеются, так что давайте сразу переходить к коду!

Простой пример:

Каналы значительно упростили параллельное программирование. Тем не менее нужно с осторожностью подходить к их пониманию. В приведённой ниже программе выполняется всего четыре операции. Рассмотрим их поэтапно.

Первый этап  —  создание канала в строке 7. Определяем тип входных данных для этого канала. Это будет string (строковый тип данных). Затем в строках с 9 по 11 создаём анонимную функцию: берём строку "ping" и отправляем её в канал messages (сообщения).

При работе с каналами операция по отправке информации обозначается стрелочкой <-, направление которой показывает, куда поступает информация: в канал или из канала.

package main

import "fmt"

func main() {

    messages := make(chan string)

    go func() { 
		messages <- "ping" 
	}()

    msg := <-messages
    fmt.Println(msg)
}

Третий этап  —  это запрос сохранённой строки "ping" из канала сообщений в переменную msg. Последний этап  —  вывод этого сообщения на консоль.

Каналы будут ожидать информацию для отправки

Каналы в Go блокирующие. Например, у нас есть две горутины, выполняющиеся одновременно. Если одна из них запрашивает у другой передачу информации по каналу, канал блокирует выполнение до тех пор, пока другая горутина не передаст информацию в соединяющий их канал.

Продемонстрировать это можно с помощью следующего кода. sync.WaitGroup позаботится о том, чтобы горутины завершились перед выходом из основного потока программы.

Первым делом инициализируем новый канал messages в строке 13.

package main

import (
	"fmt"
	"time"
	"sync"
)

func main() {
	var wg sync.WaitGroup //  инициализируем счётчик
	wg.Add(2) // в ожидании двух горутин

	messages := make(chan string)

	go func() { 
		fmt.Println("Starting first anonymous function...")
		fmt.Println("Echo!")

		time.Sleep(time.Millisecond * 1000)

		messages <- "Echo!" 
		fmt.Println("Exiting first anonymous function")
		wg.Done()
	}()

	go func() {
		fmt.Println("Starting second anonymous function...")

		msg := <- messages

		fmt.Printf("I hear an %s\n", msg)
		fmt.Println("Exiting second anonymous function")
		wg.Done()
	}()

	wg.Wait() // блокируем, пока все горутины не завершат выполнение
}

После этого первая анонимная функция на секунду засыпает, а затем перед выходом отправляет строку "Echo" в канал messages. В следующей анонимной функции в строке 26 запрашиваем входные данные из канала messages в переменную msg. Это блокирующая операция: в ней эта функция не сможет продолжать работу, пока мы не получим выходные данные из messages.

Если бы мы запустили приведённую выше программу, то получили бы следующий результат:

Starting second anonymous function...
Starting first anonymous function...
Echo!
Exiting first anonymous function
I hear an Echo!
Exiting second anonymous function

Опять же обратите внимание, что горутины недетерминированы. Нельзя сказать, в каком порядке функции будут запускаться или останавливаться. Зато известно, что каналы будут блокировать выполнение в ожидании информации для отправки. Между первой Echo! и заключительной инструкцией на вывод I hear an Echo! есть секундная задержка, потому что принимающий канал ждёт вывод.

Как предотвратить взаимоблокировку в каналах

Итак, каналы блокируют выполнение во время ожидания ресурса. Но что произойдет, если этот ресурс так и не будет отправлен? Эта ситуация приведёт ко взаимоблокировке, которую Go обнаружит не во время компиляции, а уже только во время выполнения. Рассмотрим пример:

package main

import (
	"fmt"
	"time"
	"sync"
)

func main() {
	var wg sync.WaitGroup //  инициализируем счётчик
	wg.Add(2) // в ожидании двух горутин

	messages := make(chan int)

	go func() { 
		for i := 0; i < 3; i++ {
			messages <- i
			time.Sleep(time.Millisecond * 500)
		}

		close(messages) // закрываем канал по завершении
		wg.Done()
	}()

	go func() {
		for {
			msg, open := <- messages

			if !open {
				break
			}
			fmt.Println(msg)
		}

		wg.Done()
	}()

	wg.Wait() // блокируем, пока все горутины не завершат выполнение
}

В первой анонимной функции, которая выполняется как горутина, мы трижды добавляем к каналу сообщений любое значение i. Когда мы выходим из этого цикла, то закрываем канал, сигнализируя о том, что с добавлением закончили. Таким образом взаимоблокировка предотвращается.

Во второй горутине из канала messages мы получаем msg и open. Параметр open  —  это логическое значение, которое сигнализирует о том, закрыт канал или нет. Если канал неоткрыт !open, то выходим из цикла и горутины с помощью wg.Done(). Просто, но эффективно!

Можно также воспользоваться преимуществами синтаксиса Go, упростив второй цикл for в строке 16. Тогда даже не придётся проверять, закрыт канал или нет: об этом позаботится Go.

for msg := range messages {
    fmt.Println(msg)
}

Кроме того, можно добавить каналам пропускную способность:

package main

import (
    "fmt"
)

func main() {
    messages := make(chan string, 2)
    messages <- "Yer"
    messages <- "a"

    msg := <- messages
    fmt.Println(msg)

    msg = <- messages
    fmt.Println(msg)

    messages <- "Wizard"

    msg = <- messages
    fmt.Println(msg)
}

В строке 8 мы создаём канал с пропускной способностью 2, а отправлять и получать можем даже три строки, потому что канал работает как очередь. Пока в канале не более 2 сообщений, мы не выходим за пределы пропускной способности и не получаем ошибку.

А что, если имеются каналы, блокирующие друг друга в условиях ограничения по времени? В Go можно использовать выражение select для выполнения любого готового канала:

package main

import (
    "fmt"
    "time"
)

func main() {
    c1 := make(chan string)
    c2 := make(chan string)

    go func() {
        for {
            c1 <- "Channel 1 every second"
            time.Sleep(time.Millisecond * 1000)
        }
    }()

    go func() {
        for {
            c1 <- "Channel 2 every 2 seconds"
            time.Sleep(time.Millisecond * 2000)
        }
    }()

    for {
        select {
        case msg1 := <- c1:
            fmt.Println(msg1)
        case msg2 := <- c2:
            fmt.Println(msg2)
        }
    }
}

Здесь у нас два канала, которые бесконечно добавляются в двух горутинах. Первая горутина добавляется к каналу c1 каждую секунду. Вторая добавляется к каналу c2 каждые две секунды. Если просто попытаться вывести информацию с этих каналов, канал c2 заблокирует выполнение и мы будем получать оба вывода каждые две секунды.

А вот с помощью оператора select можно будет постоянно проверять наличие канала, готового к выгрузке с него информации. Выполнение этого оператора позволит обеспечить постоянный поток информации из каналов без каких-либо блокировок.

Каналы представляют собой простой, но мощный инструмент взаимодействия горутин и позволяют более осмысленно управлять информационным потоком. При этом важно учитывать риск возникновения взаимоблокировок, ведь в Go они не распознаются во время компиляции. В этой связи умение отслеживать, когда канал должен быть открыт, или умение добавлять каналам пропускную способность  —  это уже немало.

Надеюсь, статья была полезной и вы узнали что-то новое. Спасибо за внимание!

Читайте также:

Читайте нас в Telegram, VK и Яндекс.Дзен


Перевод статьи Israel Miles: Orchestrate Your Go Routines Using Channels

Предыдущая статьяИспользуй git-команды, как senior developer
Следующая статьяЧистая архитектура с MVVM