20 типичных ошибок многопоточности в C++
Многопоточное программирование одна из самых сложных тем в программировании, особенно в C++. Трудно избежать при этом ошибок. К счастью большую часть удаётся отловить на этапе проверки кода или тестирования. Но особо коварные проникают в рабочие системы и исправлять их достаточно затруднительно.
В этой статье собраны и переведены самые значимые по мнению автора заметки ошибочные ситуации. Если у вас есть свои любимые ошибки или варианты их решения, оставьте, пожалуйста, их в комментариях.
Все примеры успешно компилируются и исполняются в Ubuntu 16.04 LTS:
g++ -std=c++14 -O2 -Wall -pedantic -pthread main.cpp && ./a.out
#1 Отсутствие join() или detach() перед завершением
Если забыть вызвать
#include <iostream> #include <thread> void foo() { std::cout << "foo" << std::endl; } int main(int argc, char *argv[]) { std::thread t(foo); return 0; }
В конце функции
В зависимости желаемого поведения следует либо подождать завершения потока:
int main(int argc, char *argv[]) { std::thread t(foo); t.join(); return 0; }
либо разорвать с ним связь
int main(int argc, char *argv[]) { std::thread t(foo); t.detach(); return 0; }
#2 Попытка дождаться завершения неподключаемого потока
Для объектов
#include <iostream> #include <thread> void foo() { std::cout << "foo" << std::endl; } int main(int argc, char *argv[]) { std::thread t(foo); t.detach(); // ... какая-то логика ... t.join(); return 0; }
В таких случаях следует проверять, а можно ли в принципе подключить поток, и только потом уже вызывать
if (t.joinable()) { t.join(); }
#3 Вызов join() блокирует вызывающий поток
В реальных приложениях потоки могут обслуживать достаточно длительные операции связанные с сетевым вводом/выводом или реакцией пользователя в пользовательском интерфейсе. Попытка дождаться завершения таких потоков в основном потоке или в потоке отвечающим за интерфейс может привести к "заморозке". Лучше найти другое решение.
Например, для десктопного приложения вспомогательный поток перед своим завершением может послать сообщение в поток отвечающий за интерфейс. Последний как правило организован как обработчик очереди сообщений. Чаще всего это сообщения от элементов интерфейса, но могут быть и нажатия клавиш и даже перемещения мыши. В этом потоке точно так же можно получить сообщение от вспомогательного потока и среагировать на завершение, не дожидаясь его буквально, и не блокируя ожиданием обработку событий.
#4 Не учитывать особенности передачи аргументов в поток
Аргументы в функцию потока перемещаются или копируются по значению. Пример ниже даже не скомпилируется.
#include <iostream> #include <thread> void foo(int &s) { s = 42; } int main(int argc, char *argv[]) { int answer = 0; std::thread t(foo, answer); t.join(); return 0; }
Для успешной компиляции ссылка должна быть передана через
std::thread t(foo, std::ref(answer));
#5 Игнорировать общий доступ к ресурсам
В условиях многопоточности несколько потоков могут одновременно использовать общие ресурсы или данные. Это может приводить не только к труднопрогнозируемому поведению но и к краху программы. В таких случаях необходимо упорядочивать и контролировать доступ.
В качестве примера рассмотрим вывод в консоль в несколько потоков - основной и шесть дополнительных.
#include <iostream> #include <thread> void foo(const std::string &message) { std::cout << "thread " << std::this_thread::get_id() << ", message " << message << std::endl; } int main(int argc, char *argv[]) { std::thread t1(foo, "каждый"); std::thread t2(foo, "охотник"); std::thread t3(foo, "желает"); foo("знать"); std::thread t4(foo, "где"); std::thread t5(foo, "сидит"); std::thread t6(foo, "фазан"); t1.join(); t2.join(); t3.join(); t4.join(); t5.join(); t6.join(); return 0; }
В результате получим мешанину из слов:
thread thread thread 140597982512960, message знать140597965162240 thread 140597939984128, message где thread 140597931591424, message 140597956769536, message thread каждый140597948376832, message желает, message сидит охотник thread 140597923198720, message фазан
Дело в том, что консоль одна, а семь потоков пытаются выводить на неё одновременно. Чтобы сделать вывод более предсказуемым необходимо ограничить одновременный доступ. Сделаем это через
#include <mutex> std::mutex cout_guard; void foo(const std::string &message) { cout_guard.lock(); std::cout << "thread " << std::this_thread::get_id() << ", message " << message << std::endl; cout_guard.unlock(); }
Вывод получится читаемый:
thread 140710040659776, message знать thread 140710023309056, message каждый thread 140710006523648, message желает thread 140709989738240, message сидит thread 140709981345536, message фазан thread 140710014916352, message охотник thread 140709998130944, message где
Мешанины из фрагментов строк уже нет. Порядок захвата нами не управляется, поэтому сами сообщения появляются в произвольном порядке.
#6 Забыть вызвать unlock()
В предыдущем примере для разделения доступа к ресурсу использовался
void foo(const std::string &message) { cout_guard.lock(); std::cout << "thread " << std::this_thread::get_id() << ", message " << message << std::endl; // cout_guard.unlock(); }
После появления первого же сообщения программа зависнет:
thread 140569688782656, message знать
Чтобы защититься от ошибок такого рода воспользуемся
В конструкторе захватывает, в деструкторе освобождает. По какой бы причине мы не покинули область видимости - блокировка будет снята.
void foo(const std::string &message) { std::lock_guard<std::mutex> lock(cout_guard); std::cout << "thread " << std::this_thread::get_id() << ", message " << message << std::endl; }
#7 Пренебрежение размером защищённой секции
Пока мы находимся внутри защищённой секции все остальные потоки рвущиеся её выполнить заблокированы. Старайтесь делать заблокированный участок как можно меньше.
#include <chrono> #include <iostream> #include <mutex> #include <thread> using namespace std::chrono_literals; std::mutex cout_mutex; void foo() { std::lock_guard<std::mutex> lock(cout_mutex); std::this_thread::sleep_for(1s); // на самом деле что-то очень полезное и безопасное std::cout << "foo" << std::endl; } int main(int argc, char *argv[]) { std::thread t1(foo); std::thread t2(foo); t1.join(); t2.join(); return 0; }
Безопасный вариант программы будет исполняться почти две секунды. Если нам необходимо защитить только вывод в консоль, то нет необходимости в секции такого размера. Мы можем переместить блокировки ближе к выводу.
void foo() { std::this_thread::sleep_for(1s); // на самом деле что-то очень полезное и безопасное std::lock_guard<std::mutex> lock(cout_mutex); std::cout << "foo" << std::endl; }
Такой вариант будет выполняться уже около одной секунды и при этом останется безопасным.
#8 Взаимные блокировки
Как правило такие блокировки уже навсегда из-за чего получили название deadlock. Типичная ситуация такой блокировки представлена ниже. Функция
#include <chrono> #include <iostream> #include <mutex> #include <thread> using namespace std::chrono_literals; std::mutex cerr_mutex; std::mutex cout_mutex; void foo() { cerr_mutex.lock(); std::cerr << "use cerr in foo" << std::endl; std::this_thread::sleep_for(1s); cout_mutex.lock(); std::cout << "use cout in foo" << std::endl; cout_mutex.unlock(); cerr_mutex.unlock(); } void bar() { cout_mutex.lock(); std::cout << "use cout in bar" << std::endl; std::this_thread::sleep_for(1s); cerr_mutex.lock(); std::cerr << "use cerr in bar" << std::endl; cerr_mutex.unlock(); cout_mutex.unlock(); } int main(int argc, char *argv[]) { std::thread t1(foo); std::thread t2(bar); t1.join(); t2.join(); return 0; }
Причина зависания кроется в перекрёстной блокировке. Когда оба потока начинают работать каждый из них блокирует свой mutex. То есть t1 захватил cerr_mutex, а t2 - cout_mutex. После вызова
Самое простое решение использовать
void foo() { std::lock(cerr_mutex, cout_mutex); std::cerr << "use cerr in foo" << std::endl; std::this_thread::sleep_for(1s); std::cout << "use cout in foo" << std::endl; cout_mutex.unlock(); cerr_mutex.unlock(); }
Если условия позволяют, можно использовать
void foo() { cerr_mutex.lock(); std::cerr << "use cerr in foo" << std::endl; std::this_thread::sleep_for(1s); if (cout_mutex.try_lock_for(1s)) { std::cout << "use cout in foo" << std::endl; cout_mutex.unlock(); } cerr_mutex.unlock(); }
С одной стороны мы успешно избежали блокировки, но с другой стороны нам пришлось взять на себя обработку этой ситуации.
#9 Повторный захват std::mutex
Эта некоторая вариация на тему перекрёстного захвата с тем лишь отличием, что для такой блокировки достаточно одного std::mutex. Даже дополнительные потоки не нужны. Тут можно было бы привести в качестве примера страшную банальщину типа:
void foo() { std::lock_guard<std::mutex> lock1(cerr_mutex); std::lock_guard<std::mutex> lock2(cerr_mutex); std::cerr << "foo" << std::endl; }
Да, это именно такого рода ошибка, вот только в жизни она встречается в несколько более изощрённой форме. В примере ниже нет ни потоков, ни рекурсии и всего один mutex.
#include <iostream> #include <mutex> std::mutex cerr_mutex; int bar() { std::lock_guard<std::mutex> lock(cerr_mutex); std::cerr << "bar" << std::endl; return 42; } void foo() { std::lock_guard<std::mutex> lock(cerr_mutex); std::cerr << "foo, bar = " << bar() << std::endl; } int main(int argc, char *argv[]) { foo(); return 0; }
Вызов foo приводит к захвату mutex, но в процессе вызова bar возникает необходимость блокировки ресурса повторно. Это такой же deadlock, только теперь основной поток ждёт сам себя.
Существует очевидный способ решить проблему - заменить обычный
void foo() { auto b = bar(); std::lock_guard<std::mutex> lock(cerr_mutex); std::cerr << "foo, bar = " << b << std::endl; }
#10 Излишняя предосторожность
Когда возникает необходимость модифицировать простые типы наподобие
#include <mutex> std::mutex counter_mutex; int counter; void foo() { std::lock_guard<std::mutex> lock(counter_mutex); ++counter; }
Та же самая логика без использования mutex и использованием atomic.
#include <atomic> std::atomic<int> counter; void foo() { ++counter; }
#11 Частое создание потоков без использования пулов
Создание и удаление потоков дорогое удовольствия с точки зрения затрат CPU. Часто создавать и удалять потоки в приложении, которое само по активно занимается вычислениями значит мешать ему. Вместо того, чтобы часто создавать и удалять потоки лучше создать пул предварительно запущенных потоков и распределять между ними задания.
Использование пула потоков позволяет сократить объём кода и количество потенциальных ошибок связанных с запуском и остановкой потоков. Позволит избежать избыточного количества потоков, которое может негативно сказаться на производительности.
Существуют готовые реализации такого рода пулов. Например, TBB
#12 Не обработанные исключения в потоке
Исключения брошенные в одном потоке не могут быть перехвачены другим. Представим себе функцию, которая может бросить исключение. Если мы выполним эту функцию в отдельном потоке, то попытка поймать исключение в основном не сработает.
#include <iostream> #include <stdexcept> #include <thread> void foo() { throw std::runtime_error("foo"); } int main(int argc, char *argv[]) { try { std::thread t(foo); t.join(); } catch (const std::exception &e) { std::cout << "error" << e.what() << std::endl; } return 0; }
Программа аварийно завершится так и не вызвав обработчик исключения. Решением может быть перехват исключения в потоке и передача информации о нём через экземпляр
#include <iostream> #include <stdexcept> #include <thread> static std::exception_ptr eptr = nullptr; void foo() { try { throw std::runtime_error("foo"); } catch (...) { eptr = std::current_exception(); } } int main(int argc, char *argv[]) { std::thread t(foo); t.join(); if (eptr) { try { std::rethrow_exception(eptr); } catch (const std::exception &e) { std::cout << "error" << e.what() << std::endl; } } return 0; }
#13 Имитация асинхронной работы без std::async
Когда нужно выполнить часть кода независимо от основного потока отличным выбором будет использование
Еще одно важно преимущество заключается в возможности получить результат работы функции через
#include <iostream> #include <cmath> #include <future> int main(int argc, char *argv[]) { auto f = std::async(sqrt, 9.0); std::cout << f.get() << std::endl; return 0; }
Использование потоков напрямую делает получение результатов чуть более громоздким. Это может быть:
Передача ссылки на результирующую переменную в поток, в котором необходимо сохранить результат.
#include <iostream> #include <cmath> #include <thread> void foo(double i, double &r) { r = std::sqrt(i); } int main(int argc, char *argv[]) { double result = 0.0; auto t = std::thread(foo, 9, std::ref(result)); t.join(); std::cout << result << std::endl; return 0; }
Сохранение результата в переменной класса функционального объекта и чтение его после завершения работы.
#include <iostream> #include <cmath> #include <thread> template<typename T> class foo { T r; public: T get() { return r; } void operator()(T i) { r = std::sqrt(i); } }; int main(int argc, char *argv[]) { auto f = foo<double>(); auto t = std::thread(std::ref(f), 9); t.join(); std::cout << f.get() << std::endl; return 0; }
Курт Гантерот в своей книге утверждает, что создание потоков в 14 раз дороже использования
Курт Гантерот, «Оптимизация программ на C++» |
Короче говоря, пока не доказано обратное использовать следует
#14 Опускание std::launch::async когда это действительно необходимо
Название
Существует два способа запуска:
std::launch::async . Задача будет немедленно запущена в отдельном потоке.std::launch::deferred . Выполнение задачи будет отложено до вызова.get() или.wait() возвращаемого объектаstd::future . При этом выполнение осуществляется синхронно!
Без явного указания способа запуска предполагается комбинация этих вариантов и фактически предсказать как именно будет запущена задача невозможно. Существуют связанные с этим сложности, например невозможно предсказать корректно ли будет обращение к переменным потока, невозможно предсказать будет ли выполнена функция вообще, если до выполнения функций
Чтобы избежать недоразумений явно указывайте
Непредсказуемо:
auto f = std::async(sqrt, 9.0);
Гарантировано в другом потоке:
auto f = std::async(std::launch::async, sqrt, 9.0);
Скотт Мейерс, «Эффективный и современный С++» |
#15 Использование .get() может привести к ожиданию
#include <chrono> #include <future> #include <iostream> int main(int argc, char *argv[]) { std::future<int> f = std::async(std::launch::async, [](){ std::this_thread::sleep_for(std::chrono::seconds(1)); return 42; }); while (true) { // ... std::cout << f.get() << std::endl; // ... } return 0; }
Несмотря на то, что лямбда явно будет запущена в отдельном потоке вызов метода
Обе проблемы можно решить проверив future на готовность.
if (f.valid()) { std::cout << f.get() << std::endl; }
#16 Исключение из задачи перетечёт во future
Исключение брошенное в асинхронной задаче должно быть обработано так, будто оно возникло в вызываемом потоке. В данном случае пример себя поведёт так, будто исключение никто не обработал.
#include <future> #include <iostream> int main(int argc, char *argv[]) { std::future<int> f = std::async(std::launch::async, [](){ throw std::runtime_error("error"); return 42; }); std::cout << f.get() << std::endl; return 0; }
Программа аварийно завершится. Если в задаче было брошено исключение, оно распространится и на вызов .get(). Если до конца жизни future .get() так и не будет вызван исключение просто проигнорируется.
Для таких задач имеет смысл использование обычной конструкции try/catch.
try { std::cout << f.get() << std::endl; } catch (const std::exception &e) { std::cout << e.what() << std::endl; }
#17 Использование std::async там, где нужен тонкий контроль потоков
В большинстве случаев достаточно использования
Например изменить параметры для планировщика:
#include <iostream> #include <thread> void foo() { std::cout << "foo" <<std::endl; } int main(int argc, char *argv[]) { auto t = std::thread(foo); sched_param sch; int policy; pthread_getschedparam(t.native_handle(), &policy, &sch); sch.sched_priority = 20; pthread_setschedparam(t.native_handle(), SCHED_FIFO, &sch); t.join(); return 0; }
Это возможно благодаря наличию метода
#18 Пренебрежение анализом нагрузки на CPU
В любой момент времени потоки можно разделить на две группы - те которые что-то делают и те, который спят.
Потоки которые что-то делают занимают ядра процессора на которые их отправил планировщик. Для потоков которые занимаются активными вычислениями важно иметь в своём распоряжении свободные ядра, в противном случае большое количество потоков не даст никакого прироста в производительности. Даже скорее наоборот снизит производительность за счёт дополнительных переключений контекста потока.
Потоки которые преимущественно находятся в режиме ожидания в таком внимании процессора не нуждаются и могут находится в системе в гораздо большем количестве. И в случае с вводом/выводом даже помогают увеличить пропускную способность.
Я рассмотрел два крайних варианта, но наш конечно же будет посередине. Да, есть метод
Но это не помогает ответить правильно на вопрос - сколько же потоков можно запустить одновременно? Число ядер помогает понять сколько одновременно потоков, которые непрерывно длительное время активно потребляют процессор.
Если сценарий использования вычислений именно такой,то количество потоков должно быть как можно ближе к количеству ядер, а то и меньше, чтобы исключить распределение на логических ядрах.
Если потоки преимущественно заблокированы мьютексами или вводом/выводом, то ограничивать количество потоков ради экономии процессора не имеет большого смысла.
В остальных случаях необходимо нагрузочное тестирование и мониторинг с регулировкой количества потоков. Иными словами подбирается экспериментально.
#19 Использование квалификатора volatile для синхронизации
Использование этого квалификатора указание компилятору того, что изменения объекта могут происходить без контроля на этапе компиляции. Не в том смысле, что они происходят из разных потоков, а в том, что вообще за пределами кода, грубо говоря самопроизвольно. Это очень низкоуровневое указание и это никак не помогает в одновременном доступе внутри процесса.
Для синхронизации следует использовать
#20 Неоправданное использование lock-free алгоритмов
Программирование без необходимости блокировок звучит очень привлекательно в сравнении с обычными механизмами синхронизации. Возможно, в случае жёсткого ограничения вычислительных ресурсов применение подобных алгоритмов может быть оправдано. В остальных случаях выглядит скорее преждевременной оптимизацией, которая, к тому же, может обернуться сложными ошибками в самое неподходящее время (и без coredump тут не обойтись).
Прежде чем приступить к использованию свободных от блокировок алгоритмов следует подумать над тремя вопросами:
- Пробовали ли спроектировать код без необходимости синхронизации?
- Выполняли ли анализ производительности, поиск и оптимизацию узких мест?
- Можно ли ограничиться горизонтальным масштабированием?
Использование lock-free алгоритмов оправдано, когда никаких других решений просто не осталось.
Надеюсь, чтение этого материала было так же полезно, как его перевод и публикация.