9.1.1. Простейший пул потоков
В простейшем случае пул состоит из фиксированного числа рабочих потоков (обычно равного значению, которое возвращает функция std::thread::hardware_concurrency()). Когда у программы появляется какая-то работа, она вызывает функцию, которая помещает эту работу в очередь. Рабочий поток забирает работу из очереди, выполняет указанную в ней задачу, после чего проверяет, есть ли в очереди другие работы. В этой реализации никакого механизма ожидания завершения задачи не предусмотрело. Если это необходимо, то вы должны будете управлять синхронизацией самостоятельно.
В следующем листинге приведена реализация такого пула потоков.
Листинг 9.1. Простой пул потоков
class thread_pool {
std::atomic_bool done;
thread_safe_queue<std::function<void()> > work_queue;← (1)
std::vector<std::thread> threads; ← (2)
join_threads joiner; ← (3)
void worker_thread() {
while (!done) { ← (4)
std::function<void()> task;
if (work_queue.try_pop(task)) { ← (5)
task(); ← (6)
} else {
std::this_thread::yield(); ← (7)
}
}
}
public:
thread_pool():
done(false), joiner(threads) {
unsigned const thread_count =
std::thread::hardware_concurrency();← (8)
try {
for (unsigned i = 0; i < thread_count; ++i) {
threads.push_back(
std::thread(&thread_pool::worker_thread, this)); ← (9)
}
} catch (...) {
done = true; ← (10)
throw;
}
}
~thread_pool() {
done = true; ← (11)
}
template<typename FunctionType>
void submit(FunctionType f) {
work_queue.push(std::function<void()>(f)); ← (12)
}
};
Здесь мы определили вектор рабочих потоков (2) и используем одну из потокобезопасных очередей из главы 6 (1) для хранения очереди работ. В данном случае пользователь не может ждать завершения задачи, а задача не может возвращать значения, поэтому для инкапсуляции задач можно использовать тип std::function<void()>. Функция submit() обертывает переданную функцию или допускающий вызов объект в объект std::function<void()> и помещает его в очередь (12).
Потоки запускаются в конструкторе; их количество равно значению, возвращаемому функцией std::thread::hardware_concurrency(), то есть мы создаем столько потоков, сколько может поддержать оборудование (8). Все эти потоки исполняют функцию-член нашего класса worker_thread() (9).
Запуск потока может завершиться исключением, поэтому необходимо позаботиться о том, чтобы уже запущенные к этому моменту потоки корректно завершались. Для этого мы включили блок try-catch, который в случае исключения поднимает флаг done (10). Кроме того, мы воспользовались классом join_threads из главы 8 (3), чтобы обеспечить присоединение всех потоков. То же самое происходит в деструкторе: мы просто поднимаем флаг done (11), а объект join_threads гарантирует, что потоки завершатся до уничтожения пула. Отметим, что порядок объявления членов важен: и флаг done и объект worker_queue должны быть объявлены раньше вектора threads, который, в свою очередь, должен быть объявлен раньше joiner. Только тогда деструкторы членов класса будут вызываться в правильном порядке; в частности, нельзя уничтожать очередь раньше, чем остановлены все потоки.
Сама функция worker_thread проста до чрезвычайности: в цикле, который продолжается, пока не поднят флаг done (4), она извлекает задачи из очереди (5) и выполняет их (6). Если в очереди нет задач, функция вызывает std::this_thread::yield() (7), чтобы немного отдохнуть и дать возможность поработать другим потокам.
Часто даже такого простого пула потоков достаточно, особенно если задачи независимы, не возвращают значений и не выполняют блокирующих операций. Но бывает и по-другому: во-первых, у программы могут быть более жесткие требования, а, во-вторых, в таком пуле возможны проблемы, в частности, из-за взаимоблокировок. Кроме того, в простых случаях иногда лучше прибегнуть к функции std::async, как неоднократно демонстрировалось в главе 8. В этой главе мы рассмотрим и более изощренные реализации пула потоков с дополнительными возможностями, которые призваны либо удовлетворить особые потребности пользователя, либо уменьшить количество потенциальных ошибок. Для начала разрешим ожидать завершения переданной пулу задачи.