9.1.1. Простейший пул потоков

В простейшем случае пул состоит из фиксированного числа рабочих потоков (обычно равного значению, которое возвращает функция std::thread::hardware_concurrency()). Когда у программы появляется какая-то работа, она вызывает функцию, которая помещает эту работу в очередь. Рабочий поток забирает работу из очереди, выполняет указанную в ней задачу, после чего проверяет, есть ли в очереди другие работы. В этой реализации никакого механизма ожидания завершения задачи не предусмотрело. Если это необходимо, то вы должны будете управлять синхронизацией самостоятельно.

В следующем листинге приведена реализация такого пула потоков.

Листинг 9.1. Простой пул потоков

class thread_pool {

 std::atomic_bool done;

 thread_safe_queue<std::function<void()> > work_queue;← (1)

 std::vector<std::thread> threads; ← (2)

 join_threads joiner; ← (3)

 void worker_thread() {

  while (!done) { ← (4)

   std::function<void()> task;

   if (work_queue.try_pop(task)) { ← (5)

    task(); ← (6)

   } else {

    std::this_thread::yield(); ← (7)

   }

  }

 }

public:

 thread_pool():

  done(false), joiner(threads) {

  unsigned const thread_count =

   std::thread::hardware_concurrency();← (8)

  try {

   for (unsigned i = 0; i < thread_count; ++i) {

    threads.push_back(

     std::thread(&thread_pool::worker_thread, this)); ← (9)

   }

  } catch (...) {

   done = true; ← (10)

   throw;

  }

 }

 ~thread_pool() {

  done = true; ← (11)

 }

 template<typename FunctionType>

 void submit(FunctionType f) {

  work_queue.push(std::function<void()>(f)); ← (12)

 }

};

Здесь мы определили вектор рабочих потоков (2) и используем одну из потокобезопасных очередей из главы 6 (1) для хранения очереди работ. В данном случае пользователь не может ждать завершения задачи, а задача не может возвращать значения, поэтому для инкапсуляции задач можно использовать тип std::function<void()>. Функция submit() обертывает переданную функцию или допускающий вызов объект в объект std::function<void()> и помещает его в очередь (12).

Потоки запускаются в конструкторе; их количество равно значению, возвращаемому функцией std::thread::hardware_concurrency(), то есть мы создаем столько потоков, сколько может поддержать оборудование (8). Все эти потоки исполняют функцию-член нашего класса worker_thread() (9).

Запуск потока может завершиться исключением, поэтому необходимо позаботиться о том, чтобы уже запущенные к этому моменту потоки корректно завершались. Для этого мы включили блок try-catch, который в случае исключения поднимает флаг done (10). Кроме того, мы воспользовались классом join_threads из главы 8 (3), чтобы обеспечить присоединение всех потоков. То же самое происходит в деструкторе: мы просто поднимаем флаг done (11), а объект join_threads гарантирует, что потоки завершатся до уничтожения пула. Отметим, что порядок объявления членов важен: и флаг done и объект worker_queue должны быть объявлены раньше вектора threads, который, в свою очередь, должен быть объявлен раньше joiner. Только тогда деструкторы членов класса будут вызываться в правильном порядке; в частности, нельзя уничтожать очередь раньше, чем остановлены все потоки.

Сама функция worker_thread проста до чрезвычайности: в цикле, который продолжается, пока не поднят флаг done (4), она извлекает задачи из очереди (5) и выполняет их (6). Если в очереди нет задач, функция вызывает std::this_thread::yield() (7), чтобы немного отдохнуть и дать возможность поработать другим потокам.

Часто даже такого простого пула потоков достаточно, особенно если задачи независимы, не возвращают значений и не выполняют блокирующих операций. Но бывает и по-другому: во-первых, у программы могут быть более жесткие требования, а, во-вторых, в таком пуле возможны проблемы, в частности, из-за взаимоблокировок. Кроме того, в простых случаях иногда лучше прибегнуть к функции std::async, как неоднократно демонстрировалось в главе 8. В этой главе мы рассмотрим и более изощренные реализации пула потоков с дополнительными возможностями, которые призваны либо удовлетворить особые потребности пользователя, либо уменьшить количество потенциальных ошибок. Для начала разрешим ожидать завершения переданной пулу задачи.