9.1.5. Занимание работ

We use cookies. Read the Privacy and Cookie Policy

Если мы хотим, чтобы «безработный» поток мог брать работы из очереди другого потока, то эта очередь должна быть доступна занимающему потоку в run_pending_tasks(). Для этого каждый поток должен зарегистрировать свою очередь в пуле или получать очередь от пула. Кроме того, необходимо позаботиться о надлежащей синхронизации и защите очереди работ, чтобы не нарушались инварианты.

Можно написать свободную от блокировок очередь, которая позволит потоку-владельцу помещать и извлекать элементы с одного конца, а другим потокам — занимать элементы с другого конца, однако реализация такой очереди выходит за рамки данной книги. Чтобы продемонстрировать идею, мы поступим проще — воспользуемся мьютексом для защиты данных очереди. Мы надеемся, что занимание работ — редкое событие, поэтому конкуренция за мьютекс будет невелика, и накладные расходы на такую очередь окажутся минимальны. Ниже приведена простая реализация с блокировками.

Листинг 9.7. Очередь с блокировкой, допускающей занимание работ

class work_stealing_queue {

private:

 typedef function_wrapper data_type;

 std::deque<data_type> the_queue; ← (1)

 mutable std::mutex the_mutex;

public:

 work_stealing_queue() {}

 work_stealing_queue(const work_stealing_queue& other)=delete;

 work_stealing_queue& operator=(

  const work_stealing_queue& other)=delete;

 void push(data_type data) { ← (2)

  std::lock_guard<std::mutex> lock(the_mutex);

  the_queue.push_front(std::move(data));

 }

 bool empty() const {

  std::lock_guard<std::mutex> lock(the_mutex);

  return the_queue.empty();

 }

 bool try_pop(data_type& res) { ← (3)

  std::lock_guard<std::mutex> lock(the_mutex);

  if (the_queue.empty()) {

   return false;

  }

  res = std::move(the_queue.front());

  the_queue.pop_front();

  return true;

 }

 bool try_steal(data_type& res) { ← (4)

  std::lock_guard<std::mutex> lock(the_mutex);

  if (the_queue.empty()) {

   return false;

  }

  res = std::move(the_queue.back());

  the_queue.pop_back();

  return true;

 }

};

Этот класс является простой оберткой вокруг std::deque<function_wrapper> (1), которая защищает все операции доступа к очереди с помощью мьютекса. Функции push() (2) и try_ pop() (3) работают с началом очереди, а функция try_steal() — с концом (4).

Получается, что эта «очередь» для потока-владельца на самом деле является стеком, обслуживаемым согласно дисциплине «последним пришёл, первым обслужен», — задача, которая была помещена последней, извлекается первой. С точки зрения кэш-памяти это даже может повысить производительность, так как относящиеся к последней задаче данные с большей вероятностью окажутся в кэше, чем данные, относящиеся к предыдущей задаче. К тому же, такая дисциплина прекрасно подходит для алгоритмов типа Quicksort. В предшествующих реализациях каждое обращение к do_sort() помещает элемент в очередь, а затем ждет его. Обрабатывая последний помещенный в очередь элемент первым, мы гарантируем, что блок, необходимый текущему вызову для завершения работы, будет обработан раньше блоков, нужных другим ветвям, а, значит, уменьшается как количество активных задач, так и занятый размер стека. Функция try_steal() извлекает элементы из противоположного по сравнению с try_pop() конца очереди, чтобы минимизировать конкуренцию; в принципе, можно было бы применить технику, обсуждавшуюся в главах 6 и 7, чтобы поддержать одновременные обращения к try_pop() и try_steal().

Итак, теперь у нас есть замечательная очередь работ, допускающая занимание. Но как воспользоваться ей в пуле потоков? Ниже приведена одна из возможных реализаций.

Листинг 9.8. Пул потоков с использованием занимания работ

class thread_pool {

 typedef function_wrapper task_type;

 std::atomic_bool done;

 thread_safe_queue<task_type> pool_work_queue;

 std::vector<std::unique_ptr<work_stealing_queue> > queues;← (1)

 std::vector<std::thread> threads;

 join_threads joiner;

 static thread_local work_stealing_queue* local_work_queue;← (2)

 static thread_local unsigned my_index;

 void worker_thread(unsigned my_index_) {

  my_index = my_index_;

  local_work_queue = queues[my_index].get(); ← (3)

  while (!done) {

   run_pending_task();

  }

 }

 bool pop_task_from_local_queue(task_type& task) {

  return local_work_queue && local_work_queue->try_pop(task);

 }

 bool pop_task_from_pool_queue(task_type& task) {

  return pool_work_queue.try_pop(task);

 }

 bool pop_task_from_other_thread_queue(task_type& task) { ← (4)

  for (unsigned i = 0; i < queues.size(); ++i) {

   unsigned const index = (my_index + i + 1) % queues.size();← (5)

   if (queues[index]->try_steal(task)) {

    return true;

   }

  }

  return false;

 }

public:

 thread_pool() :

  done(false), joiner(threads) {

  unsigned const thread_count =

   std::thread::hardware_concurrency();

  try {

   for (unsigned i = 0; i < thread_count; ++i) {

    queues.push_back(std::unique_ptr<work_stealing_queue> (← (6)

     new work_stealing_queue));

    threads.push_back(

     std::thread(&thread_pool::worker_thread, this, i));

   }

  } catch (...) {

   done = true;

   throw;

  }

 }

 ~thread_pool() {

  done = true;

 }

 template<typename FunctionType>

 std::future<typename std::result_of<FunctionType()>::type> submit(

  FunctionType f) {

  typedef typename std::result_of<FunctionType()>::type

   result_type;

  std::packaged_task<result_type()> task(f);

  std::future<result_type> res(task.get_future());

  if (local_work_queue) {

   local_work_queue->push(std::move(task));

  } else {

   pool_work_queue.push(std::move(task));

  }

  return res;

 }

 void run_pending_task() {

  task_type task;

  if (pop_task_from_local_queue(task) || ← (7)

      pop_task_from_pool_queue (task) || ← (8)

      pop_task_from_other_thread_queue(task)) { ← (9)

   task();

  } else {

   std::this_thread::yield();

  }

 }

};

Этот код очень похож на код из листинга 9.6. Первое отличие состоит в том, что локальная очередь каждого потока — объект класса work_stealing_queue, а не просто std::queue<> (2). Новый поток не выделяет очередь для себя самостоятельно; это делает конструктор пула потоков (6), и он же сохраняет новую очередь в списке очередей для данного пула (1). Индекс очереди в списке передаётся функции потока и используется затем для получения указателя на очередь (3). Это означает, что пул потоков может получить доступ к очереди, когда пытается занять задачу для потока, которому нечего делать. Новая версия run_pending_task() сначала пытается получить задачу из очереди исполняемого потока (7), затем из очереди пула (8) и, наконец, из очереди другого потока (9).

Функция pop_task_from_other_thread_queue() (4) обходит очереди, принадлежащие всем потокам пула, пытаясь занять задачу у каждой. Чтобы не случилось так, что все потоки занимают задачи у первого потока в списке, каждый поток начинает просмотр с позиции, равной его собственному индексу (5).

Теперь у нас имеется пул потоков, пригодный для самых разных целей. Разумеется, есть масса способов улучшить его для работы в конкретной ситуации, но это я оставляю в качестве упражнения для читателя. В частности, мы совсем не исследовали идею динамического изменения размера пула, так чтобы обеспечить оптимальное использование процессоров, даже когда потоки блокированы в ожидании какого-то события, например, завершения ввода/вывода или освобождения мьютекса.

Следующим в нашем списке «продвинутых» приёмов управления потоками стоит прерывание потоков.