Часть 1, Часть 2

В прошлой статье мы разобрались с тем, что такое конкурентность/параллелизм и зачем нужна синхронизация. Настала пора изучить примитивы синхронизации, которые предлагает нам стандартная библиотека шаблонов C++.

Первым из них будет std::mutex. Но сначала ознакомьтесь с картой статьи (она пригодится, если вы вдруг запутаетесь).

Итак, начнём.

Что такое мьютекс?

Мьютекс (англ. mutex, от mutual exclusion — «взаимное исключение») — это базовый механизм синхронизации. Он предназначен для организации взаимоисключающего доступа к общим данным для нескольких потоков с использованием барьеров памяти (для простоты можно считать мьютекс дверью, ведущей к общим данным).

Синтаксис

  • Заголовочный файл | #include <mutex>
  • Объявление | std::mutex mutex_name;
  • Захват мьютекса | mutex_name.lock();
    Поток запрашивает монопольное использование общих данных, защищаемых мьютексом. Дальше два варианта развития событий: происходит захват мьютекса этим потоком (и в этом случае ни один другой поток не сможет получить доступ к этим данным) или поток блокируется (если мьютекс уже захвачен другим потоком).
  • Освобождение мьютекса | mutex_name.unlock();
    Когда ресурс больше не нужен, текущий владелец должен вызвать функцию разблокирования unlock, чтобы и другие потоки могли получить доступ к этому ресурсу. Когда мьютекс освобождается, доступ предоставляется одному из ожидающих потоков.
#include <mutex>
#include <vector>std::mutex door;    // объявление мьютекса
std::vector<int> v; // общие данные                      door.lock();
/*-----------------------*/                                        /* Это потокобезопасная зона: допускается только один поток за раз
 * 
 * Гарантируется монопольное использование вектора v 
 */                                                               /*-----------------------*/
door.unlock();

Как создать потокобезопасную очередь

Разберёмся, как реализовать простейшие потокобезопасные очереди, то есть очереди с безопасным доступом для потоков.
В библиотеке стандартных шаблонов уже есть готовая очередь (rawQueue). Наша реализация будет предполагать: а) извлечение и удаление целочисленного значения из начала очереди и б) добавление нового в конец очереди. И всё это при обеспечении потокобезопасности.

Сначала выясним, почему и как эти две операции могут создавать проблемы для многопоточности.

  • Извлечение и удаление
    Для извлечения и удаления значения из начала очереди необходимо выполнить три операции:
    1. Проверить, не пуста ли очередь.
    2. Если нет, получается ссылка на начало очереди (rawQueue.front()). 
    3. Удаляется начало очереди (rawQueue.pop()). 
    В промежутках между этими этапами к очереди могут получать доступ и другие потоки с целью внесения изменений или чтения. Попробуйте сами.

Например:

Удалили “1”, хотя до его извлечения даже не дошли, потому что поток B извлекает 0 и удаляет 1.
Дальше — больше: если rawQueue состоит из одного элемента, поток B видит непустую очередь, и тут же поток A удаляет последнее значение. Теперь поток B пытается удалить первое значение из пустой очереди, приводя к неопределённому поведению. Настоящая страшилка!
  • Добавление
    Рассмотрим теперь добавление нового значения с помощью rawQueue.push(): новый элемент добавляется в конец контейнера и становится следующим за последним на данный момент элементом. Дальше на единицу увеличивается размер. Заметили здесь проблему? А что, если два потока одновременно добавят новое значение, увидев этот последний элемент? И что может произойти в интервале между добавлением нового элемента и увеличением размера? Кто-нибудь возьмёт да и прочитает неправильный размер. 

Получается, мы должны быть уверены, что никто не будет трогать очередь, пока мы выполняем наши задачи. Используем мьютекс для защиты этих многоступенчатых операций и сделаем так, чтобы все вместе они смотрелись как одна атомарная операция.

#include <mutex>
#include <queue>

class threadSafe_queue {

    std::queue<int> rawQueue; // структура, общая для всех потоков
    std::mutex m; // красная дверь rawQueue

public:

    int& retrieve_and_delete() {
        int front_value = 0; // если пустая, возвращает 0
        m.lock();
        // Отныне текущий поток единственный, который имеет доступ к rawQueue
        if( !rawQueue.empty() ) {
            front_value = rawQueue.front();
            rawQueue.pop();
        }
        m.unlock();
        // теперь другие потоки могут захватить мьютекс
        return front_value;
    };

    void push(int val) {
        m.lock();
        rawQueue.push(val);
        m.unlock();
    };

};

Обратите внимание:

  1. Связь между мьютексом и защищаемым ресурсом — только в голове программиста.
    Мы знаем, что мьютекс m защищает rawQueue, но напрямую это не указывается.
  2. Захват с необходимой степенью распараллеливания. 
    Использование мьютекса уменьшает параллелизм. Предположим, у нас только один мьютекс для защиты вектора и строки без каких-либо зависимостей (например, значение переменной не зависит от вектора и наоборот). Поток А захватывает мьютекс, читает строку и начинает обработку каких-то данных перед добавлением нового значения в вектор и освобождением мьютекса. И тут появляется поток B, который хочет внести пару изменений в строку, но при попытке захватить мьютекс (какая досада!) оказывается блокированным до того момента, пока не будут завершены все операции с вектором. Проблему решаем дополнительным мьютексом для строки (захваченным перед чтением и освобождённым сразу по завершении).
    → Всегда прикидывайте, какой объём данных будет защищён одним мьютексом.
  3. Проводите захват только для тех операций, которым это необходимо. 
    См. предыдущий пункт. 
  4. Не вызывайте lock(), если мьютекс у вас уже есть.
    Мьютекс уже заблокирован, и попытки повторного захвата приведут вас к состоянию бесконечного ожидания. Если он вам так нужен, можно воспользоваться классом std::recursive_mutex. Рекурсивный мьютекс можно получить одним и тем же потоком много раз, но столько же раз он должен быть и освобождён.
  5. Используйте try_lock() или std::timed_mutex, если не хотите блокироваться и ожидать неопределённое время.
    try_lock() — это неблокирующий метод в std::mutex. Он возвращает немедленно, даже если владение не получено, причём со значением true, если мьютекс захвачен, и false — если нет. 
    std::timed_mutex предлагает два неблокирующих метода для захвата мьютекса: try_lock_for() и try_lock_until(), причём оба возвращаются по истечении времени со значением true или false в зависимости от успешности захвата.
  6. Не забывайте вызывать unlock() или используйте std::lock_guard (или другие шаблонные классы), когда есть возможность. 
    См. ниже.

Lock guard и парадигма RAII

У нас две большие проблемы с этим простым мьютексом:

  • Что произойдёт, если мы забудем вызвать unlock()? Ресурс будет недоступен в течение всего времени существования мьютекса, и уничтожение последнего в неразблокированном состоянии приведёт к неопределённому поведению.
  • Что произойдёт, если до вызова unlock() будет выброшено исключение? unlock() так и не будет исполнен, а у нас будут все перечисленные выше проблемы.

К счастью, проблемы можно решить с помощью класса std::lock_guard. Он всегда гарантированно освобождает мьютекс, используя парадигму RAII (Resource Acquisition Is Initialization, что означает «получение ресурса есть инициализация»). Вот как это происходит: мьютекс инкапсулируется внутри lock_guard, который вызывает lock() в его конструкторе и unlock() в деструкторе при выходе из области видимости. Это безопасно даже при исключениях: раскрутка стека уничтожит lock_guard, вызывая деструктор и таким образом освобождая мьютекс.

std::lock_guard<std::mutex> lock_guard_name(raw_mutex);
#include <mutex>
#include <vector>
std::mutex door;    // объявление мьютекса
std::vector<int> v;
{     
     std::lock_guard<std::mutex> lg(door); 
     /* Вызывается конструктор lg, эквивалентный door.lock();
      * lg, размещается в стеке */
     /*-----------------------*/
       
        /* Гарантируется монопольное использование вектора v */
    
     /*-----------------------*/
} /* lg выходит из области видимости. Вызывается деструктор, эквивалентный door.unlock(); */

Посмотрим теперь, как можно изменить нашу потокобезопасную очередь threadSafe_queue (на этот раз обращаем внимание на то, где освобождается мьютекс).

#include <mutex>
#include <queue>

class threadSafe_queue {

    std::queue<int> rawQueue; // структура, общая для всех потоков
    std::mutex m; // красная дверь rawQueue

public:

    int& retrieve_and_delete() {
        int front_value = 0; // если пустая, return будет 0
        std::lock_guard<std::mutex> lg(m);
        // Отныне текущий поток единственный, который имеет доступ к rawQueue
        if( !rawQueue.empty() ) {
            front_value = rawQueue.front();
            rawQueue.pop();
        }
        return front_value;
    };  // теперь другие потоки могут захватить мьютекс

    void push(int val) {
        std::lock_guard<std::mutex> lg(m);
        // отныне текущий поток единственный, который имеет доступ к rawQueue
        rawQueue.push(val);
    }; // теперь другие потоки могут захватить мьютекс
};

Unique lock, дающий свободу

Как только владение мьютексом получено (благодаря std::lock_guard), он может быть освобождён. std::unique_lock действует в схожей манере плюс делает возможным многократный захват и освобождение (всегда в таком порядке) мьютекса, используя те же преимущества безопасности парадигмы RAII.

std::unique_lock<std::mutex> unique_lock_name(raw_mutex);
#include <mutex>
#include <vector>
std::mutex door; //объявление мьютекса
std::vector<int> v;
{     
     std::unique_lock<std::mutex> ul(door); 
     // Вызывается конструктор ul, эквивалентный door.lock();
     // ul, размещённый в стеке
     // гарантируется монопольное использование вектора
    
     door.unlock();  
   
     // выполнение операций, не связанных с вектором
     // ....
     // теперь мне снова нужен доступ к вектору 
     
     door.lock();
     //Снова гарантируется монопольное использование вектора
} /* unique_lock выходит из области видимости. Вызывается деструктор, эквивалентный door.unlock(); */

Когда использовать?

  • Когда вам не всегда нужен захват ресурса.
  • Вместе с std::condition_variable (в следующей статье).
  • При захвате std::shared_mutex в эксклюзивном режиме (см. далее). 

Общий мьютекс + общий захват дают больше читателей

std::mutex  — это мьютекс, которым одномоментно может владеть только один поток. Однако это ограничение не всегда обязательно. Например, потоки могут одновременно и безопасно читать одни и те же общие данные. Просто читать, не производя с ними никаких изменений. Но в случае с доступом к записи только записывающий поток может иметь доступ к данным.

Начиная с C++17, std::shared_mutex формирует доступ двух типов:

  • Общий доступ: потоки вместе могут владеть мьютексом и иметь доступ к одному и тому же ресурсу. Доступ такого типа можно запросить с помощью std::shared_lock (lock guard для общего мьютекса). При таком доступе любой эксклюзивный доступ блокируется.
  • Эксклюзивный доступ: доступ к ресурсу есть только у одного потока. Запрос этого типа осуществляется с помощью класса unique lock.

Синтаксис

  • Заголовочный файл | #include <shared_mutex>;
  • Объявление | std::shared_mutex raw_sharedMutex;
  • Для захвата в общем режиме |
    std::shared_lock<std::shared_mutex> sharedLock_name(raw_sharedMutex);
  • Для захвата в эксклюзивном режиме |
    std::unique_lock<std::shared_mutex> uniqueLock_name(raw_sharedMutex);
#include <shared_mutex>
#include <vector>
std::shared_mutex door; //объявление мьютекса
std::vector<int> v;
int readVectorSize() {
    /* потоки могут вызывать эту функцию одновременно
     * доступ на запись запрещена, когда получен sl */    
    
    std::shared_lock<std::shared_mutex> sl(door);
    return v.size();
}
void pushElement(int new_element) {
    /* гарантирован эксклюзивный доступ к вектору */
    
    std::unique_lock<std::shared_mutex> ul(door);
    v.push_back(new_element);
}

Scoped lock, дающий больше мьютексов (и без клинча)

Впервые появившийся в C++17 и действующий в схожей манере, что и std::lock_guard, он даёт возможность получения нескольких мьютексов. Без std::scoped_lock такая операция очень опасна, так как может привести к взаимной блокировке.

Краткая история взаимоблокировки:

Поток A хочет увести 200$ с банковского счёта Жеки на счёт Бяки в виде одной атомарной операции. Он начинает с того, что захватывает мьютекс, защищающий счёт Жеки, чтобы изъять деньги, а затем пытается захватить счёт Бяки.
В то же время поток B хочет увести 100$ со счёта Бяки на счёт Жеки. Он получает захват счёта Бяки, чтобы изъять деньги и попытаться захватить счёт Жеки. Оба потока блокируются, уснув в ожидании друг друга.

std::scoped_lock одновременно захватывают (а затем освобождают) все мьютексы, передаваемые в качестве аргумента, по принципу «всё или ничего»: если хотя бы один захват выбрасывает исключение, освобождаются все уже захваченные мьютексы.

  • std::scoped_lock<std::mutex> scoped_lock_name(raw_mutex1, raw_mutex2, ..);

Заключение

Если вы вдруг запутались в этом ворохе новой информации:

  • воспользуйтесь картой в начале статьи (или составьте свою);
  • применяйте на практике новые знания и пробуйте писать простенький код.

До встречи в следующей статье, в которой речь пойдёт о condition_variableи вы узнаете, как синхронизировать потоки!

Читайте также:


Перевод статьи Valentina: [C++] MUTEX: Write Your First Concurrent Code

Предыдущая статьяEmber.js или Vue.js: какой фреймворк выбрать?
Следующая статьяАнализ автоаварий в Барселоне с использованием Pandas, Matplotlib и Folium