9.1.4. Предотвращение конкуренции за очередь работ

We use cookies. Read the Privacy and Cookie Policy

Всякий раз, как поток вызывает функцию submit() экземпляра пула потоков, он помещает новый элемент в единственную разделяемую очередь работ. А рабочие потоки постоянно извлекают элементы из той же очереди. Следовательно, по мере увеличения числа процессоров будет возрастать конкуренция за очередь работ. Это может ощутимо отразиться на производительности; даже при использовании свободной от блокировок очереди, в которой нет явного ожидания, драгоценное время может тратиться на перебрасывание кэша.

Чтобы избежать перебрасывания кэша, мы можем завести по одной очереди работ на каждый поток. Тогда каждый поток будет помещать новые элементы в свою собственную очередь и брать работы из глобальной очереди работ только тогда, когда в его очереди работ нет. В следующем листинге приведена реализация с использованием переменной типа thread_local, благодаря которой каждый поток обладает собственной очередью работ в дополнение к глобальной.

Листинг 9.6. Пул с очередями в поточно-локальной памяти

class thread_pool {

 thread_safe_queue<function_wrapper> pool_work_queue;

 typedef std::queue<function_wrapper> local_queue_type;← (1)

 static thread_local std::unique_ptr<local_queue_type>

  local_work_queue; ← (2)

 void worker_thread() {

  local_work_queue.reset(new local_queue_type);← (3)

  while (!done) {

   run_pending_task();

  }

 }

public:

 template<typename FunctionType>

 std::future<typename std::result_of<FunctionType()>::type>

 submit(FunctionType f) {

  typedef typename std::result_of<FunctionType()>::type

   result_type;

  std::packaged_task<result_type()> task(f);

  std::future<result_type> res(task.get_future());

  if (local_work_queue) { ← (4)

   local_work_queue->push(std::move(task));

  } else {

   pool_work_queue.push(std::move(task)); ← (5)

  }

  return res;

 }

 void run_pending_task() {

  function_wrapper task;

  if (local_work_queue && !local_work_queue->empty()) {← (6)

   task = std::move(local_work_queue->front());

   local_work_queue->pop();

   task();

  } else if(pool_work_queue.try_pop(task)) { ← (7)

   task();

  } else {

   std::this_thread::yield();

  }

 }

 // остальное, как и раньше

};

Для хранения очереди работ в поточно-локальной памяти мы воспользовались указателем std::unique_ptr<> (2), потому что не хотим, чтобы у потоков, не входящих в пул, была очередь; этот указатель инициализируется в функции worker_thread() до начала цикла обработки (3). Деструктор std::unique_ptr<> позаботится об уничтожении очереди работ по завершении потока.

Затем функция submit() проверяет, есть ли у текущего потока очередь работ (4). Если есть, то это поток из пула, и мы можем поместить задачу в локальную очередь. В противном случае задачу следует помещать в очередь пула, как и раньше (5).

Аналогичная проверка имеется в функции run_pending_task() (6), только на этот раз нужно еще проверить, есть ли что-нибудь в локальной очереди. Если есть, то можно извлечь элемент из начала очереди и обработать его. Обратите внимание, что локальная очередь может быть обычным объектом std::queue<> (1), так как к ней обращается только один поток. Если в локальной очереди задач нет, то мы проверяем очередь пула, как и раньше (7).

Таким образом мы действительно уменьшаем конкуренцию, но если распределение работ неравномерно, то может случиться, что в очереди одного потока скопится много задач, тогда как остальным будет нечем заняться. Например, в случае Quicksort только самый первый блок попадает в очередь пула, а остальные окажутся в локальной очереди того потока, который этот блок обработал. Это сводит на нет всю идею пула потоков.

К счастью, у этой проблемы есть решение: позволить потоку занимать (steal) работы из очередей других потоков, если ни в его собственной, ни в глобальной очереди ничего нет.