4 совета по работе с потоками и мьютексами в C++

C++ представляет собой отличный язык программирования, который принято ассоциировать с высокой производительностью, а также доступным управлением памятью и указателями. Тем не менее в нем также есть очень важные, но менее обсуждаемые компоненты, такие как потоки и мьютексы. Одна из наиболее значимых характеристик С++  —  это широкие возможности контроля и определения точности при выполнении нескольких параллельных потоков наряду с безопасным распределением ресурсов между ними. К примеру, браузер, в котором вы сейчас это читаете, как раз задействует несколько потоков, облегчая одновременное выполнение действий и демонстрацию представлений.

В данной статье я опишу четыре рекомендации по использованию потоков и мьютексов в С++. Интерфейсы, о которых мы будем здесь говорить, относятся к версиям стандарта С++11 и выше. Ранее этой версии потоки формально в нем не определялись, и использовать можно было только интерфейсы потоков ОС, такие как POSIX.

1. Потоки не одноразовые  —  их нужно присоединять повторно!

Распространенная ошибка начинающих работу с многопоточностью в предположении, что функция, передаваемая в новый создаваемый поток, аналогична простому вызову другой функции. Но это не так. Давайте посмотрим на, казалось бы, безобидную программу ниже:

#include <iostream>
#include <thread>

void doStuff()
{
  int x = 5;
  while(x--)
       std::cout << "Doing stuff\n";
}

int main() {
  std::thread t1(&doStuff);
}

Вы можете подумать: “Код работает нормально и выводит пару сообщений”. А вот и нет  —  на деле он дает сбой.

terminate called without an active exception
exited, aborted

В выводе выше terminate является ссылкой на функцию С++ std:terminate(), которая вызывается средой выполнения в ряде опасных ситуаций. В данном случае мы начинаем поток выполнения, но не присоединяем его, что при создании потока делать необходимо, т.к. присоединение позволяет поддерживать синхронизацию программы и порядок выполнения задач. Добавление же к текущей программе t1.join() приводит ее к ожидаемому поведению.

2. Используйте флаг для обозначения момента прекращения потока

Еще одна потенциальная проблема при порождении нескольких потоков  —  это когда один из них повторяет одну и ту же функцию или цикл событий бесконечно. Потоки реализуются под определенную задачу, и в течение программы они выполняют список этих задач снова и снова. Вы не можете напрямую присоединить (join()) поток, находящийся в бесконечном цикле, потому что вызов присоединения приведет к тому, что родительский поток перейдет в бесконечное ожидание. Присоединение предполагается только для тех потоков, которые могут достичь терминала, их конечной точки завершения работы.

Чтобы правильно завершать подобные потоки, вам понадобиться глобальный флаг  —  в идеале логический  —  который будет проверяться потоками для определения, нужно ли продолжать выполнение. Начиная с C++11, переменные static bool в глобальной области считаются потокобезопасными для доступа и изменения. Это значит, что пример кода в данном случае может выглядеть так:

#include <iostream>
#include <thread>
#include <chrono>

static bool KEEP_GOING = true;

void doStuff()
{
  while(true) {
      if (!KEEP_GOING)
          break;
      std::cout << "Doing stuff\n";
  }
}


int main() {
  std::thread t1(&doStuff);
  std::thread t2(&doStuff);
  std::this_thread::sleep_for(std::chrono::milliseconds(100));
  KEEP_GOING = false;
  t1.join();
  t2.join();
}

Инструкция std::this_thread::sleep_for(std::chrono::milliseconds(100)) добавлена с целью показать, что при выполнении программы порожденный поток выполняет некую начальную работу, но затем завершает выполнение, прочитав KEEP_GOING, установленный как false. В отличие от процессов потоки не имеют встроенного сигнального механизма для остановки выполнения. Поэтому подобные приемы критически важны для остановки потока.

3. Используйте мьютексы при обращении потоков к общим ресурсам и данным

Во многих многопоточных программах и структурах потокам требуется выполнять чтение, запись и обмен данными между собой. Они могут совместно использовать любой ресурс, не являющийся локальным для каждого из них. В связи с этим переменные и объекты, расположенные в стеке одного потока, не будут доступны для других. Глобальные же объекты или указатели на объекты, передаваемые в начальную “основную” функцию потока, могут использоваться несколькими потоками. Тем не менее для получения полнофункциональной программы, работающей без ошибок, необходимо убедиться, что потоки используют общие ресурсы безопасно. Если один из них читает такой ресурс, например массив или часть памяти в то время, как другой этот ресурс переписывает, то в результате мы получим поврежденные данные.

Есть множество инструментов и техник, с помощью которых можно добиться безопасного использования общих ресурсов, но наиболее простым средством является мьютекс. Мьютекс  —  это объект с особой аппаратной и системной реализацией, позволяющий потокам применять “lock” (блокировку). Пока поток удерживает мьютекс, другие потоки, пытающиеся получить этот мьютекс, приостановят или заблокируют вызов получения, т.е. они не смогут заблокировать мьютекс, пока его не разблокирует текущий поток. Стандарт C++11 определяет кроссплатформенный тип std::mutex, который можно блокировать или разблокировать разными потоками. Делается это так:

static std::mutex mtx; // глобальная область
{ /*Область выполнения с общими ресурсами*/
    mtx.lock();
    // действия
    mtx.unlock();
}

Несмотря на то, что этот код блокирует и разблокирует мьютекс, прямой вызов методов для этого мьютекса, а именно lock или unlock, обычно не рекомендуется. Причина в том, что как только вызывается .unlock(), любой ожидающий этот мьютекс поток получает доступ к данной части кода между инструкциями lock и unlock. Кроме того, инструкции return, расположенные перед инструкцией .unlock(), приведут к тому, что мьютекс останется заблокирован навсегда, создав весьма опасную ситуацию. 

Наиболее общий способ применения мьютекса  —  это использование объекта вида RAII, который создается в начале области и уничтожается в конце. В C++11 стандартная библиотека предоставляет так называемый std::lock_guard. В C++17 есть еще более продвинутый класс под названием std::scoped_lock, являющийся объектом RAII, способным блокировать в области несколько мьютексов сразу. Преимущество использования подхода RAII при блокировании мьютекса в том, что разблокировка происходит при выходе из области, т.е. вы можете не беспокоиться о порядке расположения инструкций или необходимости достижения явного вызова .unlock().

В этом примере два потока будут читать и записывать значения массива, массив же при этом будет защищен мьютексом. Один поток выступит в роли записывающего, а второй в роли читающего. Оба они, как и ранее, будут контролироваться с помощью глобального флага. Давайте взглянем на весь код:

#include <iostream>
#include <thread>
#include <chrono>
#include <mutex>
#include <string.h>

static bool KEEP_GOING = true;

struct Point {
    long x;
    long y;
};

template<class T, size_t arraySize>
class SharedArray {
public:
    void get(size_t index, T* out) const {
        std::lock_guard<std::mutex> lockit(_mtx);
        *out = _items[index];
    }

    void set(size_t index, const T& value) {
        std::lock_guard<std::mutex> lockit(_mtx);
        _items[index] = value;
    }

    void clean() { memset(_items, 0, sizeof(_items)); }
private:
   T _items[arraySize];
   mutable std::mutex _mtx;
};

#define SHARED_ARRAY_SIZE 10
static SharedArray<Point, SHARED_ARRAY_SIZE> SHARED_ARRAY;

void doWrites()
{
  Point p;
  p.x = 0;
  p.y = 0;
  while(true) {
      if (!KEEP_GOING)
          break;
      for (size_t i = 0 ; i < SHARED_ARRAY_SIZE; ++i){
          SHARED_ARRAY.set(i, p);
          ++p.x;
          ++p.y;
      }
      std::this_thread::sleep_for(std::chrono::milliseconds(500));
  }
}

void doReads()
{
    while(true) {
      if(!KEEP_GOING)
        break;
      for (size_t i = 0 ; i < SHARED_ARRAY_SIZE; ++i){
          Point r;
          SHARED_ARRAY.get(i, &r);
          std::cout << "x=" << r.x << " y=" << r.y << "\n";
      }
      std::this_thread::sleep_for(std::chrono::milliseconds(500));   
  }
}

int main() {
  SHARED_ARRAY.clean();
  std::thread writer(&doWrites);
  std::thread reader(&doReads);
  std::this_thread::sleep_for(std::chrono::seconds(5));
  KEEP_GOING = false;
  writer.join();
  reader.join();
}

Первое, что вы могли заметить,  —  это передача значения объектов Point. Обычно в C++ при обращении к элементу вектора или массива передача происходит по ссылке. Тем не менее, как только функция any с защитой блокировки в начале ее области возвращает результат, мьютекс разблокируется. Вы не можете безопасно вернуть указатель или ссылку на ресурс, используемый несколькими потоками непосредственно. Причина в том, что получение указателя не дает гарантии блокировки других потоков от изменения значения, на которое он указывает. Это контролируется только мьютексом. 

В приведенном выше примере можно заметить несколько вызовов sleep_for. Они не обязательны для работы кода, но их применение и изменение их периодов помогает визуализировать, как потоки выполняют работу, переходят в режим сна, а затем снова возвращаются в строй. Если вы запустите эту программу, уменьшив период сна только для потока записи, то можете увидеть следующий паттерн:

x=55 y=55
x=56 y=56
x=57 y=57
x=58 y=58
x=59 y=59
x=80 y=80
x=81 y=81
x=82 y=82
x=83 y=83
x=84 y=84
x=85 y=85

В выводе возник пропуск. Но это не ошибка. Если поток чтения засыпает на более длительный период, чем поток записи, то последний за это время выполнит больше работы, чем прочитает первый. Говоря иначе, к моменту пробуждения потока чтения значения массива уже проскочат точку, в которой он последний раз их считывал, что приведет к разрыву последовательности отображения. Если же периоды сна обоих потоков равны, то такое уже не произойдет.

4. Учитывайте аппаратное ограничение многопоточности

В любом CPU есть лимит на число выполняемых одновременно потоков. Несмотря на то, что сложные программы C++ могут иметь сотни потоков, они не выполняют их все одновременно. Выполнение здесь буквально означает исполнение машинных инструкций в текущий момент. Поэтому программы, требующие множества потоков, выполняют их не одновременно, а в несколько распределенном режиме, что позволяет увеличить скорость обработки и производительность.

По определению количество параллельных потоков той или иной платформы диктуется количеством ядер CPU. Например, я сейчас работаю на ноутбуке с четырехъядерным процессором Intel Core i7 2.7 GHz. Это значит, что имеющиеся четыре ядра могут выполнять до четырех потоков одновременно. Машины, используемые на серверных станциях, таких как платформы облачных вычислений, как правило, имеют больше ядер, чем домашний ПК.

Ниже я привел пример использования статического интерфейса C++11, предназначенного для проверки лимита многопоточности:

#include <iostream>
#include <thread>

int main() {
  std::cout << "The number of concurrent threads is " << std::thread::hardware_concurrency() << "\n";
}

В моем случае получен вполне ожидаемый результат:

The number of concurrent threads is 4

Это значение может пригодиться при реализации пула потоков  —  шаблона проектирования, в котором потоки непрерывно выполняют цикл для проверки ресурса, представляющего очередь задач для выполнения. В подобном пуле вы обычно не увидите выгод от использования большего числа потоков, чем поддерживается аппаратно. Как правило, проверка лимита многопоточности относится к области проектирования оборудования.

В современном C++ есть множество компонентов, таких как атомарные переменные и барьеры памяти, которые позволяют оптимизировать многопоточные программы, но только для конкретных аппаратных архитектур.

Читайте также:

Читайте нас в Telegram, VK и Яндекс.Дзен


Перевод статьи JOSHUA WEINSTEIN: 4 Easy Tips for Using Threads and Mutexes in C++