9.1.3. Задачи, ожидающие других задач

В этой книге я уже неоднократно приводил пример алгоритма Quicksort. Его идея проста — подлежащие сортировке данные разбиваются на две части: до и после опорного элемента (в смысле заданной операции сравнения). Затем обе части рекурсивно сортируются и объединяются для получения полностью отсортированной последовательности. При распараллеливании алгоритма надо позаботиться о том, чтобы рекурсивные вызовы задействовали имеющийся аппаратный параллелизм.

В главе 4, где этот пример впервые был представлен, мы использовали std::async для выполнения одного из рекурсивных вызовов на каждом шаге и оставляли библиотеке решение о том, запускать ли новый поток или сортировать синхронно при обращении к get(). Этот подход неплохо работает — каждая задача либо выполняется в отдельном потоке, либо в тот момент, когда нужны ее результаты.

В главе 8 мы переработали эту реализацию, продемонстрировав альтернативный подход, когда количество потоков фиксировано и определяется уровнем аппаратного параллелизма. В данном случае мы воспользовались стеком ожидающих сортировки блоков. Разбивая на части предложенные для сортировки данные, каждый поток помещал один блок в стек, а второй сортировал непосредственно. Бесхитростное ожидание завершения сортировки второго блока могло бы закончиться взаимоблокировкой, потому что число потоков ограничено, и некоторые из них ждут. Очень легко оказаться в ситуации, когда все потоки ждут завершения сортировки блоков, и ни один ничего не делает. Тогда мы решили эту проблему, заставив поток извлекать блоки из стека и сортировать их, пока тот конкретный блок, которого он ждет, еще не отсортирован.

Точно такая же проблема возникает при использовании одного из рассмотренных выше простых пулов потоков вместо std::async, как в примере из главы 4. Число потоков тоже ограничено, и может случиться так, что все они будут ждать задач, которые еще запланированы, так как нет свободных потоков. И решение должно быть таким же,

как в главе 8: обрабатывать стоящие в очереди блоки во время ожидания завершения сортировки своего блока. Но если мы применяем пул потоков для управления списком задач и их ассоциациями с потоками — а именно в этом и состоит смысл пула потоков, — то доступа к списку задач у нас нет. Поэтому необходимо модифицировать сам пул, чтобы он делал это автоматически.

Проще всего будет добавить в класс thread_pool новую функцию, чтобы исполнять задачу из очереди и управлять циклом самостоятельно. Так мы и поступим. Более развитые реализации пула могли бы включить дополнительную логику в функцию ожидания или добавить другие функции ожидания для обработки этого случая, быть может, даже назначая приоритеты ожидаемым задачам. В листинге ниже приведена новая функция run_pending_task(), а модифицированный алгоритм Quicksort, в котором она используется, показан в листинге 9.5.

Листинг 9.4. Реализация функции run_pending_task()

void thread_pool::run_pending_task() {

 function_wrapper task;

 if (work_queue.try_pop(task)) {

  task();

 } else {

  std::this_thread::yield();

 }

}

Код run_pending_task() вынесен из главного цикла внутри функции worker_thread(), которую теперь можно будет изменить, так чтобы она вызывала run_pending_task(). Функция run_pending_task() пытается получить задачу из очереди и в случае успеха выполняет ее; если очередь пуста, то функция уступает управление ОС, чтобы та могла запланировать другой поток. Показанная ниже реализация Quicksort гораздо проще, чем версия в листинге 8.1, потому что вся логика управления потоками перенесена в пул.

Листинг 9.5. Реализация Quicksort на основе пула потоков

template<typename T>

struct sorter {    ← (1)

 thread_pool pool; ← (2)

 std::list<T> do_sort(std::list<T>& chunk_data) {

  if (chunk_data.empty()) {

   return chunk_data;

  }

  std::list<T> result;

  result.splice(result.begin(), chunk_data, chunk_data.begin());

  T const& partition_val = *result.begin();

  typename std::list<T>::iterator divide_point =

   std::partition(chunk_data.begin(), chunk_data.end(),

    [&](T const& val){ return val < partition_val; });

  std::list<T> new_lower_chunk;

  new_lower_chunk.splice(new_lower_chunk.end(),

   chunk_data, chunk_data.begin(),

   divide_point);

  std::future<std::list<T> > new_lower = ← (3)

  pool.submit(std::bind(&sorter::do_sort, this,

   std::move(new_lower_chunk)));

  std::list<T> new_higher(do_sort(chunk_data));

  result.splice(result.end(), new_higher);

  while (!new_lower.wait_for(std::chrono::seconds(0)) ==

   std::future_status::timeout) {

   pool.run_pending_task(); ← (4)

  }

  result.splice(result.begin(), new_lower.get());

  return result;

 }

};

template<typename T>

std::list<T> parallel_quick_sort(std::list<T> input) {

 if (input.empty()) {

  return input;

 }

 sorter<T> s;

 return s.do_sort(input);

}

Как и в листинге 8.1, реальная работа делегируется функции-члену do_sort() шаблона класса sorter (1), хотя в данном случае этот шаблон нужен лишь для обертывания экземпляра thread_pool (2).

Управление потоками и задачами теперь свелось к отправке задачи пулу (3) и исполнению находящихся в очереди задач в цикле ожидания (4). Это гораздо проще, чем в листинге 8.1, где нужно было явно управлять потоками и стеком подлежащих сортировке блоков. При отправке задачи пулу мы используем функцию std::bind(), чтобы связать указатель this с do_sort() и передать подлежащий сортировке блок. В данном случае мы вызываем std::move(), чтобы данные new_lower_chunk перемещались, а не копировались.

Мы решили проблему взаимоблокировки, возникающую из- за того, что одни потоки ждут других, но этот пул все еще далек от идеала. Отметим хотя бы, что все вызовы submit() и run_pending_task() обращаются к одной и той же очереди. В главе 8 мы видели, что модификация одного набора данных из разных потоков может негативно сказаться на производительности, стало быть, с этим нужно что-то делать.