9.1.5. Занимание работ

Если мы хотим, чтобы «безработный» поток мог брать работы из очереди другого потока, то эта очередь должна быть доступна занимающему потоку в run_pending_tasks(). Для этого каждый поток должен зарегистрировать свою очередь в пуле или получать очередь от пула. Кроме того, необходимо позаботиться о надлежащей синхронизации и защите очереди работ, чтобы не нарушались инварианты.

Можно написать свободную от блокировок очередь, которая позволит потоку-владельцу помещать и извлекать элементы с одного конца, а другим потокам — занимать элементы с другого конца, однако реализация такой очереди выходит за рамки данной книги. Чтобы продемонстрировать идею, мы поступим проще — воспользуемся мьютексом для защиты данных очереди. Мы надеемся, что занимание работ — редкое событие, поэтому конкуренция за мьютекс будет невелика, и накладные расходы на такую очередь окажутся минимальны. Ниже приведена простая реализация с блокировками.

Листинг 9.7. Очередь с блокировкой, допускающей занимание работ

class work_stealing_queue {

private:

 typedef function_wrapper data_type;

 std::deque<data_type> the_queue; ← (1)

 mutable std::mutex the_mutex;

public:

 work_stealing_queue() {}

 work_stealing_queue(const work_stealing_queue& other)=delete;

 work_stealing_queue& operator=(

  const work_stealing_queue& other)=delete;

 void push(data_type data) { ← (2)

  std::lock_guard<std::mutex> lock(the_mutex);

  the_queue.push_front(std::move(data));

 }

 bool empty() const {

  std::lock_guard<std::mutex> lock(the_mutex);

  return the_queue.empty();

 }

 bool try_pop(data_type& res) { ← (3)

  std::lock_guard<std::mutex> lock(the_mutex);

  if (the_queue.empty()) {

   return false;

  }

  res = std::move(the_queue.front());

  the_queue.pop_front();

  return true;

 }

 bool try_steal(data_type& res) { ← (4)

  std::lock_guard<std::mutex> lock(the_mutex);

  if (the_queue.empty()) {

   return false;

  }

  res = std::move(the_queue.back());

  the_queue.pop_back();

  return true;

 }

};

Этот класс является простой оберткой вокруг std::deque<function_wrapper> (1), которая защищает все операции доступа к очереди с помощью мьютекса. Функции push() (2) и try_ pop() (3) работают с началом очереди, а функция try_steal() — с концом (4).

Получается, что эта «очередь» для потока-владельца на самом деле является стеком, обслуживаемым согласно дисциплине «последним пришёл, первым обслужен», — задача, которая была помещена последней, извлекается первой. С точки зрения кэш-памяти это даже может повысить производительность, так как относящиеся к последней задаче данные с большей вероятностью окажутся в кэше, чем данные, относящиеся к предыдущей задаче. К тому же, такая дисциплина прекрасно подходит для алгоритмов типа Quicksort. В предшествующих реализациях каждое обращение к do_sort() помещает элемент в очередь, а затем ждет его. Обрабатывая последний помещенный в очередь элемент первым, мы гарантируем, что блок, необходимый текущему вызову для завершения работы, будет обработан раньше блоков, нужных другим ветвям, а, значит, уменьшается как количество активных задач, так и занятый размер стека. Функция try_steal() извлекает элементы из противоположного по сравнению с try_pop() конца очереди, чтобы минимизировать конкуренцию; в принципе, можно было бы применить технику, обсуждавшуюся в главах 6 и 7, чтобы поддержать одновременные обращения к try_pop() и try_steal().

Итак, теперь у нас есть замечательная очередь работ, допускающая занимание. Но как воспользоваться ей в пуле потоков? Ниже приведена одна из возможных реализаций.

Листинг 9.8. Пул потоков с использованием занимания работ

class thread_pool {

 typedef function_wrapper task_type;

 std::atomic_bool done;

 thread_safe_queue<task_type> pool_work_queue;

 std::vector<std::unique_ptr<work_stealing_queue> > queues;← (1)

 std::vector<std::thread> threads;

 join_threads joiner;

 static thread_local work_stealing_queue* local_work_queue;← (2)

 static thread_local unsigned my_index;

 void worker_thread(unsigned my_index_) {

  my_index = my_index_;

  local_work_queue = queues[my_index].get(); ← (3)

  while (!done) {

   run_pending_task();

  }

 }

 bool pop_task_from_local_queue(task_type& task) {

  return local_work_queue && local_work_queue->try_pop(task);

 }

 bool pop_task_from_pool_queue(task_type& task) {

  return pool_work_queue.try_pop(task);

 }

 bool pop_task_from_other_thread_queue(task_type& task) { ← (4)

  for (unsigned i = 0; i < queues.size(); ++i) {

   unsigned const index = (my_index + i + 1) % queues.size();← (5)

   if (queues[index]->try_steal(task)) {

    return true;

   }

  }

  return false;

 }

public:

 thread_pool() :

  done(false), joiner(threads) {

  unsigned const thread_count =

   std::thread::hardware_concurrency();

  try {

   for (unsigned i = 0; i < thread_count; ++i) {

    queues.push_back(std::unique_ptr<work_stealing_queue> (← (6)

     new work_stealing_queue));

    threads.push_back(

     std::thread(&thread_pool::worker_thread, this, i));

   }

  } catch (...) {

   done = true;

   throw;

  }

 }

 ~thread_pool() {

  done = true;

 }

 template<typename FunctionType>

 std::future<typename std::result_of<FunctionType()>::type> submit(

  FunctionType f) {

  typedef typename std::result_of<FunctionType()>::type

   result_type;

  std::packaged_task<result_type()> task(f);

  std::future<result_type> res(task.get_future());

  if (local_work_queue) {

   local_work_queue->push(std::move(task));

  } else {

   pool_work_queue.push(std::move(task));

  }

  return res;

 }

 void run_pending_task() {

  task_type task;

  if (pop_task_from_local_queue(task) || ← (7)

      pop_task_from_pool_queue (task) || ← (8)

      pop_task_from_other_thread_queue(task)) { ← (9)

   task();

  } else {

   std::this_thread::yield();

  }

 }

};

Этот код очень похож на код из листинга 9.6. Первое отличие состоит в том, что локальная очередь каждого потока — объект класса work_stealing_queue, а не просто std::queue<> (2). Новый поток не выделяет очередь для себя самостоятельно; это делает конструктор пула потоков (6), и он же сохраняет новую очередь в списке очередей для данного пула (1). Индекс очереди в списке передаётся функции потока и используется затем для получения указателя на очередь (3). Это означает, что пул потоков может получить доступ к очереди, когда пытается занять задачу для потока, которому нечего делать. Новая версия run_pending_task() сначала пытается получить задачу из очереди исполняемого потока (7), затем из очереди пула (8) и, наконец, из очереди другого потока (9).

Функция pop_task_from_other_thread_queue() (4) обходит очереди, принадлежащие всем потокам пула, пытаясь занять задачу у каждой. Чтобы не случилось так, что все потоки занимают задачи у первого потока в списке, каждый поток начинает просмотр с позиции, равной его собственному индексу (5).

Теперь у нас имеется пул потоков, пригодный для самых разных целей. Разумеется, есть масса способов улучшить его для работы в конкретной ситуации, но это я оставляю в качестве упражнения для читателя. В частности, мы совсем не исследовали идею динамического изменения размера пула, так чтобы обеспечить оптимальное использование процессоров, даже когда потоки блокированы в ожидании какого-то события, например, завершения ввода/вывода или освобождения мьютекса.

Следующим в нашем списке «продвинутых» приёмов управления потоками стоит прерывание потоков.