8.5.3. Параллельная реализация std::partial_sum

Алгоритм std::partial_sum вычисляет частичные суммы по диапазону, то есть каждый элемент заменяется суммой всех элементов от начала диапазона до него самого включительно. Таким образом, последовательность 1, 2, 3, 4, 5 преобразуется в 1, (1+2)=3, (1+2+3)=6, (1+2+3+4)=10, (1+2+3+4+5)=15. С точки зрения распараллеливания этот алгоритм интересен тем, что невозможно разбить диапазон на части и обрабатывать каждый блок независимо. Действительно, значение первого элемента необходимо складывать с каждым из остальных элементов, так что независимой обработки не получается.

Один из возможных подходов — вычислить частичные суммы отдельных блоков, а затем прибавить полученное значение последнего элемента в первом блоке ко всем элементам в следующем блоке и так далее. Например, если исходную последовательность 1, 2, 3, 4, 5, 6, 7, 8, 9 разбить на три равных блока, то после первого прохода получатся блоки {1, 3, 6}, {4, 9, 15}, {7, 15, 24}. Если теперь прибавить 6 (значение последнего элемента в первом блоке) ко всем элементам второго блока, то получится {1, 3, 6}, {10, 15, 21}, {7, 15, 24}. Далее прибавляем последний элемент второго блока (21) ко всем элементам третьего блока и получаем окончательный результат: {1, 3, 6}, {10, 15, 21}, {28, 36, 55}.

Процедуру прибавления частичной суммы предыдущего блока ко всем элементам следующего также можно распараллелить. Если сначала изменить последний элемент блока, то оставшиеся элементы того же блока могут модифицироваться одним потоком, тогда как другой поток одновременно будет модифицировать следующий блок. И так далее. Такое решение хорошо работает, если количество элементов в списке намного превышает количество процессорных ядер, поскольку в этом случае у каждого ядра будет достаточно элементов для обработки на каждом этапе.

Если же процессорных ядер очень много (столько же, сколько элементов в списке, или больше), то описанный подход оказывается не столь эффективен. Если разбить всю работу между процессорами, то на первом шаге каждый процессор будет работать всего с двумя элементами. Но тогда на этапе распространения результатов большинство процессоров будут ждать, и хорошо бы их чем-то запять. В таком случае можно подойти к задаче по-другому. Вместо того чтобы сначала вычислять частичные суммы всех блоков, а затем распространять их от предыдущего к следующему, мы можем распространять суммы по частям. Сначала, как и раньше, вычисляем суммы соседних элементов. На следующем шаге каждое вычисленное значение прибавляется к элементу, отстоящему от него на расстояние 2. Затем новые значения прибавляются к элементам, отстоящим на расстояние 4, и так далее. Если начать с тех же самых девяти элементов, то после первого шага мы получим последовательность 1, 3, 5, 7, 9, 11, 13, 15, 17, в которой правильно вычислены первые два элемента. После второго шага получается последовательность 1, 3, 6, 10, 14, 18, 22, 26, 30, в которой правильны первые четыре элемента. После третьего мы получим последовательность 1, 3, 6, 10, 15, 21, 28, 36, 44, в которой правильны первые восемь элементов. И после четвертого шага получаем окончательный результат 1, 3, 6, 10, 15, 21, 28, 36, 45. Общее количество шагов здесь больше, чем в первом варианте, зато и возможности для распараллеливания на большое число процессоров шире — на каждом шаге каждый процессор может обновлять одно число.

Всего во втором варианте требуется выполнить log2(N) шагов по N операций на каждом шаге (по одной на процессор), где N — число элементов в списке. В первом же варианте каждый поток производит N/k операций для вычисления частичной суммы своего блока и еще N/k операций для распространения сумм (здесь k — число потоков). Следовательно, вычислительная сложность первого варианта в терминах количества операций составляет O(N), а второго — O(N log(N)). Однако, если процессоров столько же, сколько элементов в списке, то во втором варианте требуется произвести только log(N) операций на каждом процессоре, тогда как в первом при большом k операции из-за распространения частичных сумм вперед фактически сериализуются. Таким образом, если количество процессоров невелико, то первый алгоритм завершится быстрее, тогда как в массивно параллельных системах победителем окажется второй алгоритм. Это крайнее проявление феномена, обсуждавшегося в разделе 8.2.1.

Но оставим в стороне эффективность и перейдем к коду. В листинге 8.11 приведена реализация первого подхода.

Листинг 8.11. Параллельное вычисление частичных сумм путём разбиения задачи на части

template<typename Iterator>

void parallel_partial_sum(Iterator first, Iterator last) {

 typedef typename Iterator::value_type value_type;

 struct process_chunk { ← (1)

  void operator()(Iterator begin, Iterator last,

   std::future<value_type>* previous_end_value,

   std::promise<value_type>* end_value) {

   try {

    Iterator end = last;

    ++end;

    std::partial_sum(begin, end, begin); ← (2)

    if (previous_end_value) {            ← (3)

     value_type& addend = previous_end_value->get();← (4)

     *last += addend;              ← (5)

     if (end_value) {

      end_value->set_value(*last); ← (6)

     }

     std::for_each(begin, last, [addend](value_type& item) {← (7)

      item += addend;

     });

    } else if (end_value) {

     end_value->set_value(*last); ← (8)

    }

   } catch (...) { ← (9)

    if (end_value) {

     end_value->set_exception(std::current_exception()); ← (10)

    } else {

     throw; ← (11)

    }

   }

  }

 };

 unsigned long const length = std::distance(first, last);

 if (!length)

  return last;

 unsigned long const min_per_thread = 25; ← (12)

 unsigned long const max_threads =

  (length + min_per_thread - 1) / min_per_thread;

 unsigned long const hardware_threads =

  std::thread::hardware_concurrency();

 unsigned long const num_threads =

  std::min(

   hardware_threads!= 0 ? hardware_threads : 2, max_threads);

 unsigned long const block_size = length / num_threads;

 typedef typename Iterator::value_type value_type;

 std::vector<std::thread> threads(num_threads - 1);← (13)

 std::vector<std::promise<value_type> >

  end_values(num_threads - 1);                     ← (14)

 std::vector<std::future<value_type> >

  previous_end_values;                             ← (15)

 previous_end_values.reserve(num_threads - 1);     ← (16)

 join_threads joiner(threads);

 Iterator block_start = first;

 for (unsigned long i = 0; i < (num_threads - 1); ++i) {

  Iterator block_last = block_start;

  std::advance(block_last, block_size – 1); ← (17)

  threads[i] = std::thread(process_chunk(), ← (18)

   block_start, block_last,

   (i !=0 ) ? &previous_end_values[i - 1] : 0, &end_values[i]);

  block_start = block_last;

  ++block_start; ←(19)

  previous_end_values.push_back(end_values[i].get_future());← (20)

 }

 Iterator final_element = block_start;

 std::advance(

  final_element, std::distance(block_start, last) - 1);← (21)

  process_chunk()(block_start, final_element, ← (22)

  (num_threads > 1) ? &previous_end_values.back() : 0, 0);

}

Общая структура этого кода не отличается от рассмотренных ранее алгоритмов: разбиение задачи на блоки с указанием минимального размера блока, обрабатываемого одним потоком (12). В данном случае, помимо вектора потоков (13), мы завели вектор обещаний (14), в котором будут храниться значения последних элементов в каждом блоке, и вектор будущих результатов (15), используемый для получения последнего значения из предыдущего блока. Так как мы знаем, сколько всего понадобится будущих результатов, то можем заранее зарезервировать для них место в векторе (16), чтобы избежать перераспределения памяти при запуске потоков.

Главный цикл выглядит так же, как раньше, только на этот раз нам нужен итератор, который указывает на последний элемент в блоке (17), а не на элемент за последним, чтобы можно было распространить последние элементы поддиапазонов. Собственно обработка производится в объекте-функции process_chunk, который мы рассмотрим ниже; в качестве аргументов передаются итераторы, указывающие на начало и конец блока, а также будущий результат, в котором будет храниться последнее значение из предыдущего диапазона (если таковой существует), и объект-обещание для хранения последнего значения в текущем диапазоне (18).

Запустив поток, мы можем обновить итератор, указывающий на начало блока, не забыв продвинуть его на одну позицию вперед (за последний элемент предыдущего блока) (19), и поместить будущий результат, в котором будет храниться последнее значение в текущем блоке, в вектор будущих результатов, откуда он будет извлечён на следующей итерации цикла (20).

Перед тем как обрабатывать последний блок, мы должны получить итератор, указывающий на последний элемент (21), который передается в process_chunk (22). Алгоритм std::partial_sum не возвращает значения, поэтому по завершении обработки последнего блока больше ничего делать не надо. Можно считать, что операция закончилась, когда все потоки завершили выполнение.

Теперь настало время поближе познакомиться с объектом-функцией process_chunk, который собственно и делает всю содержательную работу (1). Сначала вызывается std::partial_sum для всего блока, включая последний элемент (2), но затем нам нужно узнать, первый это блок или нет (3). Если это не первый блок, то должно быть вычислено значение previous_end_value для предыдущего блока, поэтому нам придется его подождать (4). Чтобы добиться максимального распараллеливания, мы затем сразу же обновляем последний элемент (5), так чтобы это значение можно было передать дальше, в следующий блок (если таковой имеется) (6). Сделав это, мы можем с помощью std::for_each и простой лямбда-функции (7) обновить остальные элементы диапазона.

Если previous_end_value не существует, то это первый блок, поэтому достаточно обновить end_value для следующего блока (опять же, если таковой существует, — может случиться, что блок всего один) (8).

Наконец, если какая-то операция возбуждает исключение, мы перехватываем его (9) и сохраняем в объекте-обещании (10), чтобы оно было распространено в следующий блок, когда он попытается получить последнее значение из предыдущего блока (4). В результате все исключения доходят до последнего блока, где и возбуждаются повторно (11), поскольку в этой точке мы гарантированно работаем в главном потоке.

Из-за необходимости синхронизации между потоками этот код не получится переписать под std::async. Каждая задача ждет результатов, которые становятся доступны по ходу выполнения других задач, поэтому все задачи должны работать параллельно.

Разобравшись с решением, основанным на распространении результатов обработки предыдущих блоков, посмотрим на второй из описанных алгоритмов вычисления частичных сумм.

Реализация прогрессивно-попарного алгоритма вычисления частичных сумм

Второй способ вычисления частичных сумм, основанный на сложении элементов, расположенных на все большем расстоянии друг от друга, работает лучше всего, когда процессоры могут выполнять операции сложения синхронно. В таком случае никакой дополнительной синхронизации не требуется, потому что все промежуточные результаты можно сразу распространить на следующий нуждающийся в них процессор. Но на практике такие системы встречаются редко — разве что в случаях, когда один процессор может выполнять одну и ту же команду над несколькими элементами данных одновременно с помощью так называемых SIMD-команд (Single-Instruction/Multiple-Data — одиночный поток команд, множественный поток данных). Поэтому мы должны проектировать программу в расчете на общий случай и производить явную синхронизацию на каждом шаге.

Сделать это можно, например, с помощью барьера — механизма синхронизации, который заставляет потоки ждать, пока указанное количество потоков достигнет барьера. Когда все потоки подошли к барьеру, они разблокируются и могут продолжать выполнение. В библиотеке C++11 Thread Library готовая реализация барьера отсутствует, так что нам придется написать свою собственную.

Представьте себе американские горки в парке аттракционов. Если желающих покататься достаточно, то смотритель не запустит состав, пока не будут заняты все места. Барьер работает точно так же: вы заранее задаете число «мест», и потоки будут ждать, пока все «места» заполнятся. Когда ожидающих потоков собирается достаточно, они все получают возможность продолжить выполнение; барьер при этом сбрасывается и начинает ждать следующую партию потоков. Часто такая конструкция встречается в цикле, где на каждой итерации у барьера собираются одни и те же потоки. Идея в том, чтобы все потоки «шли в ногу» — никто не должен забегать вперед. В рассматриваемом алгоритме такой «поток-торопыга» недопустим, потому что он мог бы модифицировать данные, которые еще используются другими потоками, или, наоборот, сам попытался бы использовать еще не модифицированные должным образом данные.

В следующем листинге приведена простая реализация барьера.

Листинг 8.12. Простой класс барьера

class barrier {

 unsigned const count;

 std::atomic<unsigned> spaces;

 std::atomic<unsigned> generation;

public:

 explicit barrier (unsigned count_) : ← (1)

 count(count_), spaces(count), generation(0) {}

 void wait() {

  unsigned const my_generation = generation; ← (2)

  if (!--spaces) {← (3)

   spaces = count;← (4)

   ++generation;  ← (5)

  } else {

   while (generation == my_generation) ← (6)

   std::this_thread::yield();          ← (7)

  }

 }

};

Здесь при конструировании барьера мы указываем количество «мест» (1), которое сохраняется в переменной count. В начале количество мест у барьера spaces равно count. Когда очередной поток выполняет функцию wait(), значение spaces уменьшается на единицу (3). Как только оно обращается в нуль, количество мест возвращается к исходному значению count (4), а переменная generation увеличивается, что служит для других потоков сигналом к продолжению работы (5). Если число свободных мест больше нуля, то поток должен ждать. В этой реализации используется простой спинлок (6), который сравнивает текущее значение generation с тем, что было запомнено в начале wait() (2). Поскольку generation изменяется только после того, как все потоки подошли к барьеру (5), мы можем во время ожидания уступить процессор с помощью yield() (7), чтобы ожидающий поток не потреблял процессорное время.

Эта реализация действительно очень простая — в ней для ожидания используется спинлок, поэтому она не идеальна для случая, когда потоки могут ждать долго, и совсем не годится в ситуации, когда в каждый момент времени может существовать более count потоков, выполнивших wait(). Если требуется, чтобы и в этих случаях барьер работал правильно, то следует использовать более надежную (но и более сложную) реализацию. Кроме того, я ограничился последовательно согласованными операциями над атомарными переменными, потому что они проще для понимания, но вы, возможно, захотите ослабить ограничения на порядок доступа к памяти. Такая глобальная синхронизация дорого обходится в массивно параллельных архитектурах, так как строку кэша, содержащую состояние барьера, приходится передавать всем участвующим процессорам (см. обсуждение перебрасывания кэша в разделе 8.2.2). Поэтому нужно тщательно продумать, является ли это решение оптимальным.

Как бы то ни было, барьер — это именно то, что нам необходимо в данном случае; имеется фиксированное число потоков, которые должны синхронизироваться на каждой итерации цикла. Ну почти фиксированное. Как вы, наверное, помните, элементы, расположенные близко к началу списка, получают окончательные значения всего через пару шагов. Это означает, что либо все потоки должны крутиться в цикле, пока не будет обработан весь диапазон, либо барьер должен поддерживать выпадение потоков, уменьшая значение count. Я предпочитаю второй вариант, чтобы не заставлять потоки выполнять бессмысленную работу в ожидании завершения последнего шага.

Но тогда нужно сделать count атомарной переменной, чтобы ее можно было изменять из нескольких потоков без внешней синхронизации:

std::atomic<unsigned> count;

Инициализация не меняется, но при переустановке spaces теперь нужно явно загружать spaces с помощью операции load():

spaces = count.load();

Больше никаких изменений в wait() вносить не надо, но необходима новая функция-член для уменьшения count. Назовем ее done_waiting(), потому что с ее помощью поток заявляет, что больше ждать не будет.

void done_waiting() {

 --count;               ← (1)

 if (!--spaces) {       ← (2)

  spaces = count.load();← (3)

  ++generation;

 }

}

Прежде всего мы уменьшаем count (1), чтобы при следующей переустановке spaces было отражено новое, меньшее прежнего, количество ожидающих потоков. Затем уменьшаем количество свободных мест spaces (2). Если этого не сделать, то остальные потоки будут ждать вечно, так как при инициализации spaces было задано старое, большее, значение. Если текущий поток является последним в партии, то количество мест нужно переустановить и увеличить generation на единицу (3) — так же, как в wait(). Основное отличие от wait() заключается в том, что поток не должен ждать — он же сам объявляет, что больше ждать не будет!

Теперь мы готовы написать вторую реализацию алгоритма вычисления частичных сумм. На каждом шаге каждый поток вызывает функцию wait() барьера, чтобы все потоки пересекли его вместе, а, закончив работу, поток вызывает функцию done_waiting(), чтобы уменьшить счетчик. Если наряду с исходным диапазоном использовать второй буфер, то барьер обеспечивает необходимую синхронизацию. На каждом шаге потоки либо читают исходный диапазон и записывают новое значение в буфер, либо наоборот — читают буфер и записывают новое значение в исходный диапазон. На следующем шаге исходный диапазон и буфер меняются местами. Тем самым мы гарантируем отсутствие гонки между операциями чтения и записи в разных потоках. Перед выходом из цикла каждый поток должен позаботиться о записи окончательного значения в исходный диапазон. В листинге 8.13 все это собрано воедино.

Листинг 8.13. Параллельная реализация partial_sum методом попарных обновлений

struct barrier {

 std::atomic<unsigned> count;

 std::atomic<unsigned> spaces;

 std::atomic<unsigned> generation;

 barrier(unsigned count_) :

  count(count_), spaces(count_), generation(0) {}

 void wait() {

  unsigned const gen = generation.load();

  if (!--spaces) {

   spaces = count.load();

   ++generation;

  } else {

   while (generation.load() == gen) {

    std::this_thread::yield();

   }

  }

 }

 void done_waiting() {

  --count;

  if (!--spaces) {

   spaces = count.load();

   ++generation;

  }

 }

};

template<typename Iterator>

void parallel_partial_sum(Iterator first, Iterator last) {

 typedef typename Iterator::value_type value_type;

 struct process_element { ← (1)

  void operator()(Iterator first, Iterator last,

   std::vector<value_type>& buffer,

   unsigned i, barrier& b) {

   value_type& ith_element = *(first + i);

   bool update_source = false;

   for (unsigned step = 0, stride = 1;

        stride <= i; ++step, stride *= 2) {

    value_type const& source = (step % 2) ? ← (2)

     buffer[i] : ith_element;

    value_type& dest = (step % 2) ?

     ith_element:buffer[i];

    value_type const& addend = (step % 2) ? ← (3)

     buffer[i - stride] : *(first + i – stride);

    dest = source + addend; ← (4)

    update_source = !(step % 2);

    b.wait(); ← (5)

   }

   if (update_source) { ← (6)

    ith_element = buffer[i];

   }

   b.done_waiting(); ← (7)

  }

 };

 unsigned long const length = std::distance(first, last);

 if (length <= 1)

  return;

 std::vector<value_type> buffer(length);

 barrier b(length);

 std::vector<std::thread> threads(length - 1); ← (8)

 join_threads joiner(threads);

 Iterator block_start = first;

 for (unsigned long i = 0; i < (length - 1); ++i) {

  threads[i] = std::thread(process_element(), first, last,← (9)

   std::ref(buffer), i, std::ref(b));

 }

 process_element()(first, last, buffer, length - 1, b);   ← (10)

}

Общая структура кода вам, наверное, уже понятна. Имеется класс с оператором вызова (process_element), который выполняет содержательную работу (1) и вызывается из нескольких потоков (9), хранящихся в векторе (8), а также из главного потока (10). Важное отличие заключается в том, что теперь число потоков зависит от числа элементов в списке, а не от результата, возвращаемого функцией std::thread::hardware_concurrency. Я уже говорил, что эта идея не слишком удачна, если только вы не работаете на машине с массивно параллельной архитектурой, где потоки обходятся дешево. Но это неотъемлемая часть самой идеи решения. Можно было бы обойтись и меньшим числом потоков, поручив каждому обработку нескольких значений из исходного диапазона, но тогда при относительно небольшом количестве потоков этот алгоритм оказался бы менее эффективен, чем алгоритм с прямым распространением.

Так или иначе, основная работа выполняется в операторе вызове из класса process_element. На каждом шаге берется либо i-ый элемент из исходного диапазона, либо i-ый элемент из буфера (2) и складывается с предшествующим элементом, отстоящим от него на расстояние stride (3); результат сохраняется в буфере, если мы читали из исходного диапазона, и в исходном диапазоне — если мы читали из буфера (4). Перед тем как переходить к следующему шагу, мы ждем у барьера (5). Работа заканчивается, когда элемент, отстоящий на расстояние stride, оказывается слева от начала диапазона. В этом случае мы должно обновить элемент в исходном диапазоне, если сохраняли окончательный результат в буфере (6). Наконец, мы вызываем функцию done_waiting() (7), сообщая барьеру, что больше ждать не будем.

Отметим, что это решение не безопасно относительно исключений. Если один из рабочих потоков возбудит исключение в process_element, то приложение аварийно завершится. Решить эту проблему можно было бы, воспользовавшись std::promise для запоминания исключения, как в реализации parallel_find из листинга 8.9, или просто с помощью объекта std::exception_ptr, защищенного мьютексом.

Вот и подошли к концу обещанные три примера. Надеюсь, они помогли уложить в сознании соображения, высказанные в разделе 8.1, 8.2, 8.3 и 8.4, и показали, как описанные приемы воплощаются в реальном коде.