Несколько дней новогоднего волшебства:
Успейте начать обучение в 2018-ом году со скидкой до 30%!
Выбрать курс

20 типичных ошибок многопоточности в C++

С___Deep_25-5020-1175ed.09_site.png

Многопоточное программирование одна из самых сложных тем в программировании, особенно в C++. Трудно избежать при этом ошибок. К счастью большую часть удаётся отловить на этапе проверки кода или тестирования. Но особо коварные проникают в рабочие системы и исправлять их достаточно затруднительно.

В этой статье собраны и переведены самые значимые по мнению автора заметки ошибочные ситуации. Если у вас есть свои любимые ошибки или варианты их решения, оставьте, пожалуйста, их в комментариях.

Все примеры успешно компилируются и исполняются в Ubuntu 16.04 LTS:

g++ -std=c++14 -O2 -Wall -pedantic -pthread main.cpp && ./a.out

#1 Отсутствие join() или detach() перед завершением

Если забыть вызвать join() или detach() перед завершением программы, это может привести к её аварийному завершению.

#include <iostream>
#include <thread>

void foo()
{
    std::cout << "foo" << std::endl;
}

int main(int argc, char *argv[])
{
    std::thread t(foo);

    return 0;
}

В конце функции main() объект t выходит из области видимости и вызывается деструктор. Внутри деструктора выполняется проверка на подключаемость потока. Подключаемый поток - это поток который может или уже выполняется. В данном случае это именно так поэтому будет вызвана функция std::terminate().

В зависимости желаемого поведения следует либо подождать завершения потока:

int main(int argc, char *argv[])
{
    std::thread t(foo);
    t.join();

    return 0;
}

либо разорвать с ним связь

int main(int argc, char *argv[])
{
    std::thread t(foo);
    t.detach();

    return 0;
}

#2 Попытка дождаться завершения неподключаемого потока

Для объектов std::thread которые были перемещены, завершены join() или брошены detach() нельзя дождаться завершения.

#include <iostream>
#include <thread>

void foo()
{
    std::cout << "foo" << std::endl;
}

int main(int argc, char *argv[])
{
    std::thread t(foo);
    t.detach();
    // ... какая-то логика ...
    t.join();

    return 0;
}

В таких случаях следует проверять, а можно ли в принципе подключить поток, и только потом уже вызывать join().

    if (t.joinable())
    {
        t.join();
    }

#3 Вызов join() блокирует вызывающий поток

В реальных приложениях потоки могут обслуживать достаточно длительные операции связанные с сетевым вводом/выводом или реакцией пользователя в пользовательском интерфейсе. Попытка дождаться завершения таких потоков в основном потоке или в потоке отвечающим за интерфейс может привести к "заморозке". Лучше найти другое решение.

Например, для десктопного приложения вспомогательный поток перед своим завершением может послать сообщение в поток отвечающий за интерфейс. Последний как правило организован как обработчик очереди сообщений. Чаще всего это сообщения от элементов интерфейса, но могут быть и нажатия клавиш и даже перемещения мыши. В этом потоке точно так же можно получить сообщение от вспомогательного потока и среагировать на завершение, не дожидаясь его буквально, и не блокируя ожиданием обработку событий.

#4 Не учитывать особенности передачи аргументов в поток

Аргументы в функцию потока перемещаются или копируются по значению. Пример ниже даже не скомпилируется.

#include <iostream>
#include <thread>

void foo(int &s)
{
    s = 42;
}

int main(int argc, char *argv[])
{
    int answer = 0;
    std::thread t(foo, answer);
    t.join();

    return 0;
}

Для успешной компиляции ссылка должна быть передана через std::ref

    std::thread t(foo, std::ref(answer));

#5 Игнорировать общий доступ к ресурсам

В условиях многопоточности несколько потоков могут одновременно использовать общие ресурсы или данные. Это может приводить не только к труднопрогнозируемому поведению но и к краху программы. В таких случаях необходимо упорядочивать и контролировать доступ.

В качестве примера рассмотрим вывод в консоль в несколько потоков - основной и шесть дополнительных.

#include <iostream>
#include <thread>

void foo(const std::string &message)
{
    std::cout << "thread " << std::this_thread::get_id() << ", message " << message << std::endl;
}

int main(int argc, char *argv[])
{
    std::thread t1(foo, "каждый");
    std::thread t2(foo, "охотник");
    std::thread t3(foo, "желает");
    foo("знать");
    std::thread t4(foo, "где");
    std::thread t5(foo, "сидит");
    std::thread t6(foo, "фазан");

    t1.join();
    t2.join();
    t3.join();
    t4.join();
    t5.join();
    t6.join();

    return 0;
}

В результате получим мешанину из слов:

thread thread thread 140597982512960, message знать140597965162240
thread 140597939984128, message где
thread 140597931591424, message 140597956769536, message thread каждый140597948376832, message желает, message 
сидит
охотник

thread 140597923198720, message фазан

Дело в том, что консоль одна, а семь потоков пытаются выводить на неё одновременно. Чтобы сделать вывод более предсказуемым необходимо ограничить одновременный доступ. Сделаем это через std::mutex - заблокируем до вывода и освободим после.

#include <mutex>

std::mutex cout_guard;

void foo(const std::string &message)
{
    cout_guard.lock();
    std::cout << "thread " << std::this_thread::get_id() << ", message " << message << std::endl;
    cout_guard.unlock();
}

Вывод получится читаемый:

thread 140710040659776, message знать
thread 140710023309056, message каждый
thread 140710006523648, message желает
thread 140709989738240, message сидит
thread 140709981345536, message фазан
thread 140710014916352, message охотник
thread 140709998130944, message где

Мешанины из фрагментов строк уже нет. Порядок захвата нами не управляется, поэтому сами сообщения появляются в произвольном порядке.

#6 Забыть вызвать unlock()

В предыдущем примере для разделения доступа к ресурсу использовался std::mutex. Это не самый удачный способ, поскольку вызова unlock() мы можем и не достичь, если вообще не забыли его вызвать.

void foo(const std::string &message)
{
    cout_guard.lock();
    std::cout << "thread " << std::this_thread::get_id() << ", message " << message << std::endl;
    // cout_guard.unlock();
}

После появления первого же сообщения программа зависнет:

thread 140569688782656, message знать

Чтобы защититься от ошибок такого рода воспользуемся std::lock_guard, который манипулирует временем жизни блокировки в стиле RAII.

В конструкторе захватывает, в деструкторе освобождает. По какой бы причине мы не покинули область видимости - блокировка будет снята.

void foo(const std::string &message)
{
    std::lock_guard<std::mutex> lock(cout_guard);
    std::cout << "thread " << std::this_thread::get_id() << ", message " << message << std::endl;
}

#7 Пренебрежение размером защищённой секции

Пока мы находимся внутри защищённой секции все остальные потоки рвущиеся её выполнить заблокированы. Старайтесь делать заблокированный участок как можно меньше.

#include <chrono>
#include <iostream>
#include <mutex>
#include <thread>

using namespace std::chrono_literals;

std::mutex cout_mutex;

void foo()
{
    std::lock_guard<std::mutex> lock(cout_mutex);
    std::this_thread::sleep_for(1s); // на самом деле что-то очень полезное и безопасное
    std::cout << "foo" << std::endl;
}

int main(int argc, char *argv[])
{
    std::thread t1(foo);
    std::thread t2(foo);

    t1.join();
    t2.join();

    return 0;
}

Безопасный вариант программы будет исполняться почти две секунды. Если нам необходимо защитить только вывод в консоль, то нет необходимости в секции такого размера. Мы можем переместить блокировки ближе к выводу.

void foo()
{
    std::this_thread::sleep_for(1s); // на самом деле что-то очень полезное и безопасное
    std::lock_guard<std::mutex> lock(cout_mutex);
    std::cout << "foo" << std::endl;
}

Такой вариант будет выполняться уже около одной секунды и при этом останется безопасным.

#8 Взаимные блокировки

Как правило такие блокировки уже навсегда из-за чего получили название deadlock. Типичная ситуация такой блокировки представлена ниже. Функция sleep_for даёт нам 100% шанс попасть в вечную блокировку, без неё скорее всего на любой машине этот код выполнился бы без зависания.

#include <chrono>
#include <iostream>
#include <mutex>
#include <thread>

using namespace std::chrono_literals;

std::mutex cerr_mutex;
std::mutex cout_mutex;

void foo()
{
    cerr_mutex.lock();
    std::cerr << "use cerr in foo" << std::endl;
    std::this_thread::sleep_for(1s);

    cout_mutex.lock();
    std::cout << "use cout in foo" << std::endl;

    cout_mutex.unlock();
    cerr_mutex.unlock();
}

void bar()
{
    cout_mutex.lock();
    std::cout << "use cout in bar" << std::endl;
    std::this_thread::sleep_for(1s);

    cerr_mutex.lock();
    std::cerr << "use cerr in bar" << std::endl;

    cerr_mutex.unlock();
    cout_mutex.unlock();
}

int main(int argc, char *argv[])
{
  std::thread t1(foo);
  std::thread t2(bar);

  t1.join();
  t2.join();

  return 0;
}

Причина зависания кроется в перекрёстной блокировке. Когда оба потока начинают работать каждый из них блокирует свой mutex. То есть t1 захватил cerr_mutex, а t2 - cout_mutex. После вызова sleep_for потоки пытаются захватить их наоборот, еще не освободив занятые. Для того чтобы освободить свой mutex потоку приходится ждать пока это сделает второй, а у второго ситуация ровно такая же.

Самое простое решение использовать std::lock для захвата обоих mutex.

void foo()
{
    std::lock(cerr_mutex, cout_mutex);
    std::cerr << "use cerr in foo" << std::endl;
    std::this_thread::sleep_for(1s);

    std::cout << "use cout in foo" << std::endl;

    cout_mutex.unlock();
    cerr_mutex.unlock();
}

Если условия позволяют, можно использовать std::timed_mutex. Для выхода из перекрёстной блокировки достаточно, чтобы одну из них можно было нарушить.

void foo()
{
    cerr_mutex.lock();
    std::cerr << "use cerr in foo" << std::endl;
    std::this_thread::sleep_for(1s);

    if (cout_mutex.try_lock_for(1s))
    {
        std::cout << "use cout in foo" << std::endl;

        cout_mutex.unlock();
    }

    cerr_mutex.unlock();
}

С одной стороны мы успешно избежали блокировки, но с другой стороны нам пришлось взять на себя обработку этой ситуации.

#9 Повторный захват std::mutex

Эта некоторая вариация на тему перекрёстного захвата с тем лишь отличием, что для такой блокировки достаточно одного std::mutex. Даже дополнительные потоки не нужны. Тут можно было бы привести в качестве примера страшную банальщину типа:

void foo()
{
    std::lock_guard<std::mutex> lock1(cerr_mutex);
    std::lock_guard<std::mutex> lock2(cerr_mutex);
    std::cerr << "foo" << std::endl;
}

Да, это именно такого рода ошибка, вот только в жизни она встречается в несколько более изощрённой форме. В примере ниже нет ни потоков, ни рекурсии и всего один mutex.

#include <iostream>
#include <mutex>

std::mutex cerr_mutex;

int bar()
{
    std::lock_guard<std::mutex> lock(cerr_mutex);
    std::cerr << "bar" << std::endl;

    return 42;
}

void foo()
{
    std::lock_guard<std::mutex> lock(cerr_mutex);
    std::cerr << "foo, bar = " << bar() << std::endl;
}

int main(int argc, char *argv[])
{
  foo();

  return 0;
}

Вызов foo приводит к захвату mutex, но в процессе вызова bar возникает необходимость блокировки ресурса повторно. Это такой же deadlock, только теперь основной поток ждёт сам себя.

Существует очевидный способ решить проблему - заменить обычный std::mutex на рекурсивный std::recursive_mutex и это решит нашу проблему, но решит её ой каким опасным способом. Тысячу раз подумайте всё ли будет в порядке при таком подходе, не удастся ли найти решение более элегантное

void foo()
{
    auto b = bar();
    std::lock_guard<std::mutex> lock(cerr_mutex);
    std::cerr << "foo, bar = " << b << std::endl;
}

#10 Излишняя предосторожность

Когда возникает необходимость модифицировать простые типы наподобие bool или int использование 'std::atomic' почти всегда более эффективно в сравнении с использованием mutex.

#include <mutex>

std::mutex counter_mutex;
int counter;

void foo()
{
    std::lock_guard<std::mutex> lock(counter_mutex);
    ++counter;
}

Та же самая логика без использования mutex и использованием atomic.

#include <atomic>

std::atomic<int> counter;

void foo()
{
    ++counter;
}

#11 Частое создание потоков без использования пулов

Создание и удаление потоков дорогое удовольствия с точки зрения затрат CPU. Часто создавать и удалять потоки в приложении, которое само по активно занимается вычислениями значит мешать ему. Вместо того, чтобы часто создавать и удалять потоки лучше создать пул предварительно запущенных потоков и распределять между ними задания.

Использование пула потоков позволяет сократить объём кода и количество потенциальных ошибок связанных с запуском и остановкой потоков. Позволит избежать избыточного количества потоков, которое может негативно сказаться на производительности.

Существуют готовые реализации такого рода пулов. Например, TBB

#12 Не обработанные исключения в потоке

Исключения брошенные в одном потоке не могут быть перехвачены другим. Представим себе функцию, которая может бросить исключение. Если мы выполним эту функцию в отдельном потоке, то попытка поймать исключение в основном не сработает.

#include <iostream>
#include <stdexcept>
#include <thread>

void foo()
{
    throw std::runtime_error("foo");
}

int main(int argc, char *argv[])
{
    try
    {
        std::thread t(foo);
        t.join();
    }
    catch (const std::exception &e)
    {
        std::cout << "error" << e.what() << std::endl;
    }

    return 0;
}

Программа аварийно завершится так и не вызвав обработчик исключения. Решением может быть перехват исключения в потоке и передача информации о нём через экземпляр std::exception_ptr в родительский поток и повторного бросания уже за пределами породившего исключения потока.

#include <iostream>
#include <stdexcept>
#include <thread>

static std::exception_ptr eptr = nullptr;

void foo()
{
    try
    {
        throw std::runtime_error("foo");
    }
    catch (...)
    {
        eptr = std::current_exception();
    }
}

int main(int argc, char *argv[])
{
    std::thread t(foo);
    t.join();

    if (eptr)
    {
        try
        {
            std::rethrow_exception(eptr);
        }
        catch (const std::exception &e)
        {
            std::cout << "error" << e.what() << std::endl;
        }
    }

    return 0;
}

#13 Имитация асинхронной работы без std::async

Когда нужно выполнить часть кода независимо от основного потока отличным выбором будет использование std::async для запуска. Это тоже самое, что создать ещё один поток и передать ему на выполнение функцию или лямбду. Правда при этом за жизненным циклом потока и исключений в нём тоже придётся следить самостоятельно. В случае использования std::async можно не заботиться об этом да ещё и существенно сократить вероятность блокировки.

Еще одно важно преимущество заключается в возможности получить результат работы функции через std::future. Функция int foo(), будучи выполнена как асинхронная задача, заранее установит результат своей работы. А получим мы его тогда, когда нам это будет удобно.

#include <iostream>
#include <cmath>
#include <future>

int main(int argc, char *argv[])
{
    auto f = std::async(sqrt, 9.0);

    std::cout << f.get() << std::endl;

    return 0;
}

Использование потоков напрямую делает получение результатов чуть более громоздким. Это может быть:

Передача ссылки на результирующую переменную в поток, в котором необходимо сохранить результат.

#include <iostream>
#include <cmath>
#include <thread>

void foo(double i, double &r)
{
    r =  std::sqrt(i);
}

int main(int argc, char *argv[])
{
    double result = 0.0;
    auto t = std::thread(foo, 9, std::ref(result));

    t.join();
    std::cout << result << std::endl;

    return 0;
}

Сохранение результата в переменной класса функционального объекта и чтение его после завершения работы.

#include <iostream>
#include <cmath>
#include <thread>

template<typename T>
class foo
{
    T r;
public:
    T get()
    {
        return r;
    }
    void operator()(T i)
    {
        r =  std::sqrt(i);
    }
};


int main(int argc, char *argv[])
{
    auto f = foo<double>();
    auto t = std::thread(std::ref(f), 9);

    t.join();
    std::cout << f.get() << std::endl;

    return 0;
}

Курт Гантерот в своей книге утверждает, что создание потоков в 14 раз дороже использования std::async.

Курт Гантерот, «Оптимизация программ на C++» Курт Гантерот, «Оптимизация программ на C++»

Короче говоря, пока не доказано обратное использовать следует std::async.

#14 Опускание std::launch::async когда это действительно необходимо

Название std::async может ввести в заблуждение, потому что функция, которая будет передана для запуска по умолчанию может и не запуститься отдельно от вызываемого потока!

Существует два способа запуска:

  1. std::launch::async. Задача будет немедленно запущена в отдельном потоке.
  2. std::launch::deferred. Выполнение задачи будет отложено до вызова .get() или .wait() возвращаемого объекта std::future. При этом выполнение осуществляется синхронно!

Без явного указания способа запуска предполагается комбинация этих вариантов и фактически предсказать как именно будет запущена задача невозможно. Существуют связанные с этим сложности, например невозможно предсказать корректно ли будет обращение к переменным потока, невозможно предсказать будет ли выполнена функция вообще, если до выполнения функций .get() или .wait() дело так и не дошло ну и цикл ожидания готовности future никогда не закончится для отложенного сценария, ждать бесполезно.

Чтобы избежать недоразумений явно указывайте std::launch::async при запуске std::async.

Непредсказуемо:

    auto f = std::async(sqrt, 9.0);

Гарантировано в другом потоке:

    auto f = std::async(std::launch::async, sqrt, 9.0);
Скотт Мейерс, «Эффективный и современный С++» Скотт Мейерс, «Эффективный и современный С++»

#15 Использование .get() может привести к ожиданию

#include <chrono>
#include <future>
#include <iostream>

int main(int argc, char *argv[])
{
    std::future<int> f = std::async(std::launch::async, [](){
        std::this_thread::sleep_for(std::chrono::seconds(1));
        return 42;
    });

    while (true)
    {
        // ...
        std::cout << f.get() << std::endl;
        // ...
    }

    return 0;
}

Несмотря на то, что лямбда явно будет запущена в отдельном потоке вызов метода .get() может привести к нежелательному ожиданию. Более того, на следующей итерации код вообще аварийно завершится, потому что первый результат уже был выведен на консоль а никакого другого во future нет.

Обе проблемы можно решить проверив future на готовность.

        if (f.valid())
        {
            std::cout << f.get() << std::endl;
        }

#16 Исключение из задачи перетечёт во future

Исключение брошенное в асинхронной задаче должно быть обработано так, будто оно возникло в вызываемом потоке. В данном случае пример себя поведёт так, будто исключение никто не обработал.

#include <future>
#include <iostream>

int main(int argc, char *argv[])
{
    std::future<int> f = std::async(std::launch::async, [](){
        throw std::runtime_error("error");
        return 42;
    });

    std::cout << f.get() << std::endl;

    return 0;
}

Программа аварийно завершится. Если в задаче было брошено исключение, оно распространится и на вызов .get(). Если до конца жизни future .get() так и не будет вызван исключение просто проигнорируется.

Для таких задач имеет смысл использование обычной конструкции try/catch.

    try
    {
        std::cout << f.get() << std::endl;
    }
    catch (const std::exception &e)
    {
        std::cout << e.what() << std::endl;
    }

#17 Использование std::async там, где нужен тонкий контроль потоков

В большинстве случаев достаточно использования std::async, кроме ситуаций когда возникает необходимость в полном контроле над исполняемым потоком.

Например изменить параметры для планировщика:

#include <iostream>
#include <thread>

void foo()
{
    std::cout << "foo" <<std::endl;
}

int main(int argc, char *argv[])
{
    auto t = std::thread(foo);

    sched_param sch;
    int policy;
    pthread_getschedparam(t.native_handle(), &policy, &sch);
    sch.sched_priority = 20;
    pthread_setschedparam(t.native_handle(), SCHED_FIFO, &sch);

    t.join();

    return 0;
}

Это возможно благодаря наличию метода .native_handle() у std::thread, значение которого можно использовать в POSIX системах. Использование этого метода полезно всегда, когда не хватает функциональности ни std::async ни std::thread. Использование std::async скрывает детали реализации и непригодно для такой тонкой работы.

#18 Пренебрежение анализом нагрузки на CPU

В любой момент времени потоки можно разделить на две группы - те которые что-то делают и те, который спят.

Потоки которые что-то делают занимают ядра процессора на которые их отправил планировщик. Для потоков которые занимаются активными вычислениями важно иметь в своём распоряжении свободные ядра, в противном случае большое количество потоков не даст никакого прироста в производительности. Даже скорее наоборот снизит производительность за счёт дополнительных переключений контекста потока.

Потоки которые преимущественно находятся в режиме ожидания в таком внимании процессора не нуждаются и могут находится в системе в гораздо большем количестве. И в случае с вводом/выводом даже помогают увеличить пропускную способность.

Я рассмотрел два крайних варианта, но наш конечно же будет посередине. Да, есть метод std::thread::hardware_concurrency(), которая сообщит нам сколько ядер доступно планировщику с учётом физических и логических.

Но это не помогает ответить правильно на вопрос - сколько же потоков можно запустить одновременно? Число ядер помогает понять сколько одновременно потоков, которые непрерывно длительное время активно потребляют процессор.

Если сценарий использования вычислений именно такой,то количество потоков должно быть как можно ближе к количеству ядер, а то и меньше, чтобы исключить распределение на логических ядрах.

Если потоки преимущественно заблокированы мьютексами или вводом/выводом, то ограничивать количество потоков ради экономии процессора не имеет большого смысла.

В остальных случаях необходимо нагрузочное тестирование и мониторинг с регулировкой количества потоков. Иными словами подбирается экспериментально.

#19 Использование квалификатора volatile для синхронизации

Использование этого квалификатора указание компилятору того, что изменения объекта могут происходить без контроля на этапе компиляции. Не в том смысле, что они происходят из разных потоков, а в том, что вообще за пределами кода, грубо говоря самопроизвольно. Это очень низкоуровневое указание и это никак не помогает в одновременном доступе внутри процесса.

Для синхронизации следует использовать atomic, mutex, и condition_variable.

#20 Неоправданное использование lock-free алгоритмов

Программирование без необходимости блокировок звучит очень привлекательно в сравнении с обычными механизмами синхронизации. Возможно, в случае жёсткого ограничения вычислительных ресурсов применение подобных алгоритмов может быть оправдано. В остальных случаях выглядит скорее преждевременной оптимизацией, которая, к тому же, может обернуться сложными ошибками в самое неподходящее время (и без coredump тут не обойтись).

Прежде чем приступить к использованию свободных от блокировок алгоритмов следует подумать над тремя вопросами:

  1. Пробовали ли спроектировать код без необходимости синхронизации?
  2. Выполняли ли анализ производительности, поиск и оптимизацию узких мест?
  3. Можно ли ограничиться горизонтальным масштабированием?

Использование lock-free алгоритмов оправдано, когда никаких других решений просто не осталось.


Надеюсь, чтение этого материала было так же полезно, как его перевод и публикация.

Автор
0 комментариев
Для комментирования необходимо авторизоваться