9.1.2. Ожидание задачи, переданной пулу потоков
В примерах из главы 8, где потоки запускались явно, главный поток после распределения работы между потоками всегда ждал завершения запущенных потоков. Тем самым гарантировалось, что вызывающая программа получит управление только после полного завершения задачи. При использовании пула потоков ждать нужно завершения задачи, переданной пулу, а не самих рабочих потоков. Это похоже на то, как мы ждали будущих результатов при работе с std::async в главе 8. В случае простого пула потоков, показанного в листинге 9.1, организовывать ожидание придется вручную, применяя механизмы, описанные в главе 4: условные переменные и будущие результаты. Это усложняет код; намного удобнее было бы ждать задачу напрямую.
За счет переноса сложности в сам пул потоков мы сумеем добиться желаемого. Функция submit() могла бы возвращать некий описатель задачи, по которому затем можно было бы ждать ее завершения. Сам описатель должен был бы инкапсулировать условную переменную или будущий результат. Это упростило бы код, пользующийся пулом потоков.
Частный случай ожидания завершения запущенной задачи возникает, когда главный поток нуждается в вычисленном ей результате. Мы уже встречались с такой ситуацией выше, например, в функции parallel_accumulate() из главы 2. В таком случае путем использования будущих результатов мы можем объединить ожидание с передачей результата. В листинге 9.2 приведен код модифицированного пула потоков, который разрешает ожидать завершения задачи и передает возвращенный ей результат ожидающему потоку. Поскольку экземпляры класса std::packaged_task<> допускают только перемещение, но не копирование, мы больше не можем воспользоваться классом std::function<> для обертывания элементов очереди, потому что std::function<> требует, чтобы в обернутых объектах-функциях был определён копирующий конструктор. Вместо этого мы напишем специальный класс-обертку, умеющий работать с объектами, обладающими только перемещающим конструктором. Это простой маскирующий тип класс (type-erasure class), в котором определён оператор вызова. Нам нужно поддержать функции, которые не принимают параметров и возвращают void, поэтому оператор всего лишь вызывает виртуальный метод call(), который в свою очередь вызывает обернутую функцию.
Листинг 9.2. Пул потоков, ожидающий завершения задачи
class function_wrapper {
struct impl_base {
virtual void call() = 0;
virtual ~impl_base() {}
};
std::unique_ptr<impl_base> impl;
template<typename F>
struct impl_type: impl_base {
F f;
impl_type(F&& f_): f(std::move(f_)) {}
void call() { f(); }
};
public:
template<typename F> function_wrapper(F&& f):
impl(new impl_type<F>(std::move(f))) {}
void operator()() { impl->call(); }
function_wrapper() = default;
function_wrapper(function_wrapper&& other):
impl(std::move(other.impl)) {}
function_wrapper& operator=(function_wrapper&& other) {
impl = std::move(other.impl);
return *this;
}
function_wrapper(const function_wrapper&) = delete;
function_wrapper(function_wrapper&) = delete;
function_wrapper& operator=(const function_wrapper&) = delete;
};
class thread_pool {
thread_safe_queue<function_wrapper> work_queue;←┐
│ Используем
void worker_thread() │ function_
{ │ wrapper
while (!done) │ вместо std::
{ │ function
function_wrapper task; ←┘
if (work_queue.try_pop(task))
task();
else
std::this_thread::yield();
}
}
public:
template<typename FunctionType>
std::future<typename std::result_of<FunctionType()>::type>← (1)
submit(FunctionType f) {
typedef typename std::result_of<FunctionType()>::type
result_type; ← (2)
std::packaged_task<result_type()> task(std::move(f));← (3)
std::future<result_type> res(task.get_future()); ← (4)
work_queue.push(std::move(task)); ← (5)
return res; ← (6)
}
// остальное, как и раньше
};
Прежде всего отметим, что модифицированная функция submit() (1) возвращает объект std::future<>, который будет содержать возвращенное задачей значение и позволит вызывающей программе ждать ее завершения. Для этого нам необходимо знать тип значения, возвращаемого переданной функцией f, и здесь на помощь приходит шаблон std::result_of<>: std::result_of<FunctionType()>::type — это тип результата, возвращенного вызовом объекта типа FunctionType (например, f) без аргументов. Выражение std::result_of<> мы используем также в определении псевдонима типа result_type (2) внутри функции.
Затем f обертывается объектом std::packaged_task<result_type()> (3), потому что f — функция или допускающий вызов объект, который не принимает параметров и возвращает результат типа result_type. Теперь мы можем получить будущий результат из std::packaged_task<> (4), перед тем как помещать задачу в очередь (5) и возвращать будущий результат (6). Отметим, что при помещении задачи в очередь мы должны использовать функцию std::move(), потому что класс std::packaged_task<> не допускает копирования. Именно поэтому в очереди хранятся объекты function_wrapper, а не объекты типа.
Этот пул позволяет ожидать завершения задач и получать возвращаемые ими результаты. В листинге ниже показано, как выглядит функция parallel_accumulate, работающая с таким пулом потоков.
Листинг 9.3. Функция parallel_accumulate, реализованная с помощью пула потоков, допускающего ожидание задач
template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init) {
unsigned long const length = std::distance(first, last);
if (!length)
return init;
unsigned long const block_size = 25;
unsigned long const num_blocks =
(length + block_size - 1) / block_size; ← (1)
std::vector<std::future<T> > futures(num_blocks-1);
thread_pool pool;
Iterator block_start = first;
for (unsigned long i = 0; i < (num_blocks - 1); ++i) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
futures[i] = pool.submit(accumulate_block<Iterator, T>());← (2)
block_start = block_end;
}
T last_result =
accumulate_block<Iterator, T>()(block_start, last);
T result = init;
for (unsigned long i = 0; i < (num_blocks - 1); ++i) {
result += futures[i].get();
}
result += last_result;
return result;
}
Сравнивая этот код с листингом 8.4, следует обратить внимание на две вещи. Во-первых, мы работаем с количеством блоков (num_blocks (1)), а не потоков. Чтобы в полной мере воспользоваться масштабируемостью пула потоков, мы должны разбить работу на максимально мелкие блоки, с которыми имеет смысл работать параллельно. Если потоков в пуле немного, то каждый поток будет обрабатывать много блоков, но по мере роста числа потоков, поддерживаемых оборудованием, будет расти и количество блоков, обрабатываемых параллельно.
Но, выбирая «максимально мелкие блоки, с которыми имеет смысл работать параллельно», будьте осторожны. Отправка задачи пулу потоков, выбор ее рабочим потоком из очереди и передача возвращенного значения с помощью std::future<> — всё это операции не бесплатные, и для совсем мелких задач они не окупятся. Если размер задачи слишком мал, то программа, в которой используется пул потоков, может работать медленнее, чем однопоточная.
В предположении, что размер блока выбран разумно, вам не надо заботиться об упаковке задач, получении будущих результатов и хранении объектов std::thread, чтобы впоследствии их можно было присоединить; все это пул берет на себя. Вам остается лишь вызвать функцию submit(), передав ей свою задачу (2).
Пул потоков обеспечивает также безопасность относительно исключений. Любое возбужденное задачей исключение передается через будущий результат, возвращенный submit(), а, если выход из функции происходит в результате исключения, то деструктор пула потоков снимет еще работающие задачи и дождется завершения потоков, входящих в пул.
Эта схема работает для простых случаев, когда задачи независимы. Но не годится, когда одни задачи зависят от других, также переданных пулу.