9.1.3. Задачи, ожидающие других задач
В этой книге я уже неоднократно приводил пример алгоритма Quicksort. Его идея проста — подлежащие сортировке данные разбиваются на две части: до и после опорного элемента (в смысле заданной операции сравнения). Затем обе части рекурсивно сортируются и объединяются для получения полностью отсортированной последовательности. При распараллеливании алгоритма надо позаботиться о том, чтобы рекурсивные вызовы задействовали имеющийся аппаратный параллелизм.
В главе 4, где этот пример впервые был представлен, мы использовали std::async для выполнения одного из рекурсивных вызовов на каждом шаге и оставляли библиотеке решение о том, запускать ли новый поток или сортировать синхронно при обращении к get(). Этот подход неплохо работает — каждая задача либо выполняется в отдельном потоке, либо в тот момент, когда нужны ее результаты.
В главе 8 мы переработали эту реализацию, продемонстрировав альтернативный подход, когда количество потоков фиксировано и определяется уровнем аппаратного параллелизма. В данном случае мы воспользовались стеком ожидающих сортировки блоков. Разбивая на части предложенные для сортировки данные, каждый поток помещал один блок в стек, а второй сортировал непосредственно. Бесхитростное ожидание завершения сортировки второго блока могло бы закончиться взаимоблокировкой, потому что число потоков ограничено, и некоторые из них ждут. Очень легко оказаться в ситуации, когда все потоки ждут завершения сортировки блоков, и ни один ничего не делает. Тогда мы решили эту проблему, заставив поток извлекать блоки из стека и сортировать их, пока тот конкретный блок, которого он ждет, еще не отсортирован.
Точно такая же проблема возникает при использовании одного из рассмотренных выше простых пулов потоков вместо std::async, как в примере из главы 4. Число потоков тоже ограничено, и может случиться так, что все они будут ждать задач, которые еще запланированы, так как нет свободных потоков. И решение должно быть таким же,
как в главе 8: обрабатывать стоящие в очереди блоки во время ожидания завершения сортировки своего блока. Но если мы применяем пул потоков для управления списком задач и их ассоциациями с потоками — а именно в этом и состоит смысл пула потоков, — то доступа к списку задач у нас нет. Поэтому необходимо модифицировать сам пул, чтобы он делал это автоматически.
Проще всего будет добавить в класс thread_pool новую функцию, чтобы исполнять задачу из очереди и управлять циклом самостоятельно. Так мы и поступим. Более развитые реализации пула могли бы включить дополнительную логику в функцию ожидания или добавить другие функции ожидания для обработки этого случая, быть может, даже назначая приоритеты ожидаемым задачам. В листинге ниже приведена новая функция run_pending_task(), а модифицированный алгоритм Quicksort, в котором она используется, показан в листинге 9.5.
Листинг 9.4. Реализация функции run_pending_task()
void thread_pool::run_pending_task() {
function_wrapper task;
if (work_queue.try_pop(task)) {
task();
} else {
std::this_thread::yield();
}
}
Код run_pending_task() вынесен из главного цикла внутри функции worker_thread(), которую теперь можно будет изменить, так чтобы она вызывала run_pending_task(). Функция run_pending_task() пытается получить задачу из очереди и в случае успеха выполняет ее; если очередь пуста, то функция уступает управление ОС, чтобы та могла запланировать другой поток. Показанная ниже реализация Quicksort гораздо проще, чем версия в листинге 8.1, потому что вся логика управления потоками перенесена в пул.
Листинг 9.5. Реализация Quicksort на основе пула потоков
template<typename T>
struct sorter { ← (1)
thread_pool pool; ← (2)
std::list<T> do_sort(std::list<T>& chunk_data) {
if (chunk_data.empty()) {
return chunk_data;
}
std::list<T> result;
result.splice(result.begin(), chunk_data, chunk_data.begin());
T const& partition_val = *result.begin();
typename std::list<T>::iterator divide_point =
std::partition(chunk_data.begin(), chunk_data.end(),
[&](T const& val){ return val < partition_val; });
std::list<T> new_lower_chunk;
new_lower_chunk.splice(new_lower_chunk.end(),
chunk_data, chunk_data.begin(),
divide_point);
std::future<std::list<T> > new_lower = ← (3)
pool.submit(std::bind(&sorter::do_sort, this,
std::move(new_lower_chunk)));
std::list<T> new_higher(do_sort(chunk_data));
result.splice(result.end(), new_higher);
while (!new_lower.wait_for(std::chrono::seconds(0)) ==
std::future_status::timeout) {
pool.run_pending_task(); ← (4)
}
result.splice(result.begin(), new_lower.get());
return result;
}
};
template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input) {
if (input.empty()) {
return input;
}
sorter<T> s;
return s.do_sort(input);
}
Как и в листинге 8.1, реальная работа делегируется функции-члену do_sort() шаблона класса sorter (1), хотя в данном случае этот шаблон нужен лишь для обертывания экземпляра thread_pool (2).
Управление потоками и задачами теперь свелось к отправке задачи пулу (3) и исполнению находящихся в очереди задач в цикле ожидания (4). Это гораздо проще, чем в листинге 8.1, где нужно было явно управлять потоками и стеком подлежащих сортировке блоков. При отправке задачи пулу мы используем функцию std::bind(), чтобы связать указатель this с do_sort() и передать подлежащий сортировке блок. В данном случае мы вызываем std::move(), чтобы данные new_lower_chunk перемещались, а не копировались.
Мы решили проблему взаимоблокировки, возникающую из- за того, что одни потоки ждут других, но этот пул все еще далек от идеала. Отметим хотя бы, что все вызовы submit() и run_pending_task() обращаются к одной и той же очереди. В главе 8 мы видели, что модификация одного набора данных из разных потоков может негативно сказаться на производительности, стало быть, с этим нужно что-то делать.