С

Часть 1, Часть 2, Часть 3

Мы уже видели, как разные потоки с помощью мьютекса спокойно могут получать доступ к общим данным. Но что делать, если этим потокам нужны промежуточные результаты от других потоков? 
Проиллюстрируем примитивным, но жизненным примером.

… и с атмосферой из фильма «Убить Билла»

Вспомним фильм «Убить Билла». У нас действие тоже происходит в роскошном ресторане. В ролях: поток-официант и поток-повар и кнопка вызова официанта — примитив синхронизации condition variable. Кстати, суровые персонажи из «Убить Билла» тоже могут появиться: заглянут на кухню, если им не понравится стрепня повара, а с официантом разберутся прямо на месте, если он будет не слишком расторопен.

Представьте, что вы в кухне ресторана. Повар ждёт, когда принесут новый заказ. Наконец, официант принимает заказ, и повар начинает готовить. Их коммуникация осуществляется благодаря общей переменной orderStatus, защищаемой мьютексом. Эта переменная может иметь следующие значения:

idle // значение по умолчанию;
newOrder // есть новый заказ для повара;
ready // официант может принести заказ.

Теперь представьте, что было бы, если бы официант через каждые две минуты заходил на кухню и спрашивал повара, не готов ли заказ. Верно: это была бы пустая трата времени и нервных клеток. Это называется поллингом: поток-официант всё время захватывает мьютекс и проверяет условие в цикле, что приводит к необязательным потерям цикла и батареи.

Было бы лучше, если бы официант ставил будильник на 10 минут (используя std::this_thread::sleep_for()), прежде чем в очередной раз спрашивать у повара о готовности заказа.

std::unique_lock<std::mutex> ul(order);

while ( orderStatus != ready ) {

// повар всё ещё готовит
  ul.unlock();

// ставим будильник
  std::this_thread::sleep_for(std::chrono::milliseconds(300));

// просыпаемся и идём проверять, не готов ли заказ
  ul.lock();

}

// заказ готов

Так-то оно, конечно, лучше. Но в случае, если заказ будет готов на 11 минуте, пройдут ещё девять минут, прежде чем будильник зазвонит в следующий раз. За это время блюда успеют остыть и официанту придётся иметь дело с недовольными посетителями. А ведь это суровые персонажи из фильма Тарантино!

Идеальной была бы такая ситуация: повар нажимает на кнопку вызова официанта, как только заказ будет готов, а официант дремлет в ожидании сигнала от повара. В роли кнопки вызова в C++ выступает примитив синхронизации condition variable, позволяющий блокировать один или несколько потоков до поступления сигнала.

Как работает механизм синхронизации?

Синхронизация нужна, когда есть как минимум два потока, один из которых ожидает от другого уведомления о наступлении какого-то события, как то: запись значения, добавление элемента в очередь и др.

  • Список ожидающих потоков
    std::condition_variable по сути представляет собой список потоков (изначально пустой), которые в настоящий момент используют его для ожидания уведомления.
  • Потоки должны иметь общий доступ к:
     1) переменной, связанной с событием (например, the orderStatus);
     2) мьютексу, защищающему такую совместно используемую переменную | std::mutex mutex_name; 
     3) объекту-примитиву condition variable, который моделирует механизм синхронизации | std::condition_variable condition;.

Уведомление

Поток может уведомить:

  • только один случайный поток из числа ожидающих на condition variable, вызвав с помощью notify_one() объект на condition variable;
  • все ожидающие потоки с помощью notify_all().

Перед уведомлением поток должен захватить мьютекс. Уведомление поступает только после освобождения мьютекса.

std::lock_guard<std::mutex> lg(mutex_name);    // этап 1
   /* Здесь поток может безопасно выполнить
    * нужные ему действия над общей переменной*/

condition.notify_one();                       // этап 2
// илиcondition.notify_all();

Ожидание

Важное замечание:

Понимание того, о чём пойдёт речь дальше, сэкономит вам в будущем кучу нервов при работе с кодом. Будет непросто, зато потом ваши усилия окупятся. При ожидании события потоку приходится пройти три этапа:

std::unique_lock<std::mutex> ul(mutex_name);    // этап 1
condition.wait(ul, <optional_condition_check>); // этап 2
    /* Здесь поток получил уведомление
     * и может безопасно выполнять действия
     * над общей переменной */

ul.unlock();                                    // этап 3

1. Захват мьютекса с помощью unique lock

std::unique_lock<std::mutex> ul(mutex_name)
Прежде чем переходить к следующему этапу, нужно захватить мьютекс. Если сделать это не удаётся, происходит блокировка, которая продолжается до тех пор, пока мьютекс не будет захвачен. Мьютекс захватывается во избежание критических состязаний при пробуждении. Но мьютекс всего один, а уведомление получают несколько потоков. Поэтому пробуждаться они будут последовательно (один за другим), так же последовательно разблокируясь и захватывая мьютекс. Пропуск этого этапа может привести к неопределённому поведению, чего бы очень не хотелось.

Из-за необходимости проводить многократный захват и освобождение мьютекса приходится использовать unique lock вместо lock guard (подробнее об этом далее).

2. Ожидание на condition variable

condition.wait(ul, <condition_check>);
wait() вызывается у condition variable, передавая unique_lock и опционально вызываемый объект (как правило, это лямбда-функция), который оценивает, удовлетворяется ли ожидаемое условие и возвращает true или false в соответствии с этой оценкой. Проверка условия необязательна, но она нужна для того, чтобы не возникло ложных уведомлений (как если бы повар нажимал на кнопку вызова официанта по ошибке).

Эта функция ожидания выполняет следующие действия:
1) вносит текущий поток в список ожидающих потоков;
2) проверяет, удовлетворяется ли ожидаемое условие, задействуя вызываемый объект;
3) в случае неудовлетворения, освобождает мьютекс и отмечает поток как невыполняемый.

Уведомление — ложное или нет — должно разбудить поток:
4) происходит повторная попытка захвата мьютекса (или блокировка, продолжающаяся до тех пор, пока он не будет захвачен);
5) проводится проверка того, удовлетворяется ли условие (чтобы не было ложных уведомлений);
6) в случае удовлетворения, поток удаляется из списка ожидающих потоков и отмечается как выполняемый. Если условие не удовлетворяется, выполняется действие №3. 

Не забывайте, что существует временная задержка между моментами, когда поток становится выполняемым и когда он уже выполняется. Это означает, что пробуждение потока не совпадает с его выполнением.

3. Освобождение мьютекса после использования

ul.unlock();
После пробуждения потока выполняется код, в соответствии с приведённой выше инструкцией ожидания, с уже захваченным мьютексом. Теперь, когда стало известно, что ожидаемое событие произошло, выполняются действия над общей переменной. 
По завершении работы с общей переменной важно вызвать функцию разблокирования unlock, чтобы и другие потоки могли захватить мьютекс.

Итоговый пример

Попробуем написать код для примера с рестораном, используя класс для синхронизации двух потоков.

Действия официанта

  1. Уведомить повара о новом заказе.
  2. Дремать в ожидании сигнала от повара.
  3. Принести заказ.

Действия повара

  1. Ждать, когда принесут новый заказ.
  2. Приготовить блюда из заказа.
  3. Нажать на кнопку вызова официанта.

Повар и официант должны иметь общий доступ к:

  • переменной: orderStatus;
  • мьютексу, защищающему заказ: std::mutex order;
  • condition variable: std::condition_variable orderBell;
#include <mutex>
#include <condition_variable>
#include <iostream>

class Restaurant {
    enum class Status { idle, newOrder, ready };
    Status orderStatus = Status::idle;
    std::mutex order;
    std::condition_variable orderBell;
    
public:

    void chef() {
        
        std::unique_lock<std::mutex> ul(order);
        orderBell.wait(ul, [=]() { return orderStatus == Status::newOrder; });
        //приготовление блюд из заказа
        orderStatus = Status::ready;
        orderBell.notify_one();
      
    }

    void waiter() {
        {
            std::lock_guard<std::mutex> lg(order); 
            orderStatus = Status::newOrder;
            orderBell.notify_one();
        } // lg вне области видимости = order.unlock()
        
        std::unique_lock<std::mutex> ul(order);
        orderBell.wait(ul, [=]() { return orderStatus == Status::ready; });
        orderStatus = Status::idle;
        ul.unlock();
        //приносят заказ
    }

};

Замечание: lock guard в методе waiter вполне можно заменить на unique lock.

И, наконец, класс Restaurant используется вот так:

#include "Restaurant.h"
#include <thread>

int main() {
    Restaurant restaurant;
    std::thread chef(&Restaurant::chef, std::ref(restaurant));
    std::thread waiter(&Restaurant::waiter, std::ref(restaurant));
    chef.join();
    waiter.join();
    return 0;
}

Заключение

Отлично! Позади самая сложная часть, касающаяся примитива синхронизации condition variable. И теперь вам остаются три вещи: код, код и ещё раз код! Поэтому следующая статья будет посвящена практическим примерам и упражнениям (с ответами), решать которые — одно удовольствие!

Вот одно из упражнений: попробуйте развить пример с рестораном, добавив несколько официантов, причём каждый из них не только принимает новый заказ, но и может принести любой из уже готовых.

Читайте также:


Перевод статьи Valentina: [C++] Thread Synchronization at the Restaurant

Предыдущая статьяАвтоматизация работы с Python
Следующая статьяСмертоносные интерфейсы