8.5.1. Параллельная реализация std::for_each

Идея std::for_each проста — этот алгоритм вызывает предоставленную пользователем функцию для каждого элемента диапазона. Различие между параллельной и последовательной реализацией std::for_each заключается, прежде всего, в порядке вызовов функции. Стандартная версия std::for_each вызывает функцию сначала для первого элемента диапазона, затем для второго и так далее, тогда как параллельная версия не дает гарантий относительно порядка обработки элементов, они даже могут (и хочется надеяться, будут) обрабатываться параллельно.

Для реализации параллельной версии нужно всего лишь разбить диапазон на участки, которые будут обрабатываться каждым потоком. Количество элементов известно заранее, поэтому такое разбиение можно произвести до начала работы (см. раздел 8.1.1). Мы будем предполагать, что это единственная исполняемая параллельная задача, поэтому вычислить количество требуемых потоков можно с помощью функции std::thread::hardware_concurrency(). Мы также знаем, что элементы можно обрабатывать абсолютно независимо, поэтому для предотвращения ложного разделения (см. раздел 8.2.3) имеет смысл использовать соседние блоки.

По своей структуре этот алгоритм похож на параллельную версию std::accumulate, описанную в разделе 8.4.1, только вместо вычисления суммы элементов он применяет к ним заданную функцию. На первый взгляд, это должно бы существенно упростить код, потому что не нужно возвращать никакой результат. Но если мы собираемся передавать исключения вызывающей программе, то все равно придется воспользоваться механизмами std::packaged_task и std::future, чтобы передавать исключения из одного потока в другой. Ниже приведен пример реализации.

Листинг 8.7. Параллельная реализация std::for_each

template<typename Iterator, typename Func>

void parallel_for_each(Iterator first, Iterator last, Func f) {

 unsigned long const length = std::distance(first, last);

 if (!length)

  return;

 unsigned long const min_per_thread = 25;

 unsigned long const max_threads =

  (length + min_per_thread - 1) / min_per_thread;

 unsigned long const hardware_threads =

  std::thread::hardware_concurrency();

 unsigned long const num_threads =

  std::min(

   hardware_threads != 0 ? hardware_threads : 2, max_threads);

 unsigned long const block_size = length / num_threads;

 std::vector<std::future<void> > futures(num_threads - 1); ← (1)

 std::vector<std::thread> threads(num_threads – 1);

 join_threads joiner(threads);

 Iterator block_start = first;

 for (unsigned long i = 0; i < (num_threads - 1); ++i) {

  Iterator block_end = block_start;

  std::advance(block_end, block_size);

  std::packaged_task<void(void)> task( ← (2)

   [=]() {

    std::for_each(block_start, block_end, f);

   });

  futures[i] = task.get_future();

  threads[i] = std::thread(std::move(task)); ← (3)

  block_start = block_end;

 }

 std::for_each(block_start, last, f);

 for (unsigned long i = 0; i < (num_threads - 1); ++i) {

  futures[i].get(); ← (4)

 }

}

Структурно код ничем не отличается от приведенного в листинге 8.4, что и неудивительно. Основное различие состоит в том, что в векторе futures хранятся объекты std::future<void> (1), потому что рабочие потоки не возвращают значение, а в качестве задачи мы используем простую лямбда-функцию, которая вызывает функцию f для элементов из диапазона от block_start до block_end (2). Это позволяет не передавать конструктору потока (3) диапазон. Поскольку рабочие потоки ничего не возвращают, обращения к futures[i].get() (4) служат только для получения исключений, возникших в рабочих потоках; если мы не хотим передавать исключения, то эти обращения можно вообще опустить.

Реализацию parallel_for_each можно упростить, воспользовавшись std::async, — точно так же, как мы делали при распараллеливании std::accumulate.

Листинг 8.8. Параллельная реализация std::for_each с применением std::async

template<typename Iterator, typename Func>

void parallel_for_each(Iterator first, Iterator last, Func f) {

 unsigned long const length = std::distance(first, last);

 if (!length)

  return;

 unsigned long const min_per_thread = 25;

 if (length < (2 * min_per_thread)) {

  std::for_each(first, last, f); ← (1)

 } else {

  Iterator const mid_point = first + length / 2;

  std::future<void> first_half = ← (2)

   std::async(&parallel_for_each<Iterator, Func>,

   first, mid_point, f);

  parallel_for_each(mid_point, last, f); ← (3)

  first_half.get();                      ← (4)

 }

}

Как и в случае реализации parallel_accumulate с помощью std::async в листинге 8.5, мы разбиваем данные рекурсивно в процессе выполнения, а не заранее, потому что не знаем, сколько потоков задействует библиотека. На каждом шаге данные делятся пополам, пока их не останется слишком мало для дальнейшего деления. При этом одна половина обрабатывается асинхронно (2), а вторая — непосредственно (3). Когда дальнейшее деление становится нецелесообразным, вызывается std::for_each (1). И снова использование std::async и функции-члена get() объекта std::future (4) обеспечивает семантику распространения исключения.

Теперь перейдем от алгоритмов, которые выполняют одну и ту же операцию над каждым элементом (к их числу относятся также std::count и std::replace), к чуть более сложному случаю — std::find.