8.4.1. Безопасность относительно исключений в параллельных алгоритмах

Безопасность относительно исключений — необходимая составная часть любой приличной программы на С++, и параллельные программы — не исключение. На самом деле, при разработке параллельных алгоритмов часто требуется уделять исключениям даже больше внимания. Если какая-то операция в последовательном алгоритме возбуждает исключение, то алгоритм должен лишь позаботиться о предотвращении утечек памяти и нарушения собственных инвариантов, а потом может передать исключение вызывающей программе для обработки. В параллельных же алгоритмах многие операции выполняются в разных потоках. В этом случае исключение невозможно распространить вверх по стеку вызовов, потому что у каждого потока свой стек. Если выход из функции потока производится в результате исключения, то приложение завершается.

В качестве конкретного примера рассмотрим еще раз функцию parallel_accumulate из листинга 2.8, который воспроизведен ниже.

Листинг 8.2. Наивная параллельная организация std::accumulate (из листинга 2.8)

template<typename Iterator, typename T>

struct accumulate_block {

 void operator()(Iterator first, Iterator last, T& result) {

  result = std::accumulate(first, last, result); ← (1)

 }

};

template<typename Iterator, typename T>

T parallel_accumulate(Iterator first, Iterator last, T init) {

 unsigned long const length = std::distance(first, last);← (2)

 if (!length)

  return init;

 unsigned long const min_per_thread = 25;

 unsigned long const max_threads =

  (length + min_per_thread - 1) / min_per_thread;

 unsigned long const hardware_threads =

  std::thread::hardware_concurrency();

 unsigned long const num_threads =

  std::min(

   hardware_threads != 0 ? hardware_threads : 2, max_threads);

 unsigned long const block_size = length / num_threads;

 std: :vector<T> results (num_threads); ← (3)

 std::vector<std::thread> threads(num_threads - 1); ← (4)

 Iterator block_start = first; ← (5)

 for (unsigned long i = 0; i < (num_threads - 1); ++i) {

  Iterator block_end = block_start; ← (6)

  std::advance(block_end, block_size);

  threads[i] = std::thread( ← (7)

   accumulate_block<Iterator, T>(),

   block_start, block_end, std::ref(results[i]));

  block_start = block_end; ← (8)

 }

 accumulate_block()(

  block_start, last, results[num_threads - 1]);← (9)

 std::for_each(threads.begin(), threads.end(),

 std::mem_fn(&std::thread::join));

 return

  std::accumulate(results.begin(), results.end(), init); ← (10)

}

Посмотрим, где могут возникнуть исключения. Вообще говоря, это вызовы библиотечных функций, которые могут возбуждать исключения, а также операции, определенные в пользовательском типе.

Итак, начнем. В точке (2) мы обращаемся к функции distance, которая выполняет операции над пользовательским типом итератора. Поскольку мы еще не начали работу, и обращение к этой функции произведено из вызывающего потока, то тут всё нормально. Далее мы выделяем память для векторов results (3) и threads (4). И эти обращения произведены из вызывающего потока до начала работы и до создания новых потоков, так что и здесь всё хорошо. Разумеется, если конструктор threads возбудит исключение, то нужно будет освободить память, выделенную для results, но об этом позаботится деструктор.

С инициализацией объекта block_start (5) всё чисто по тем же причинам, так что перейдём к операциям в цикле запуска потоков (6), (7), (8). Если после создания первого же потока (7) возникнет исключение, и мы его не перехватим, появится проблема; деструкторы объектов std::thread вызывают std::terminate, что приводит к аварийному завершению программы. Нехорошо.

Обращение к accumulate_block в точке (9) может возбуждать исключения — с точно такими же последствиями: объекты потоков будут уничтожены, а их деструкторы вызовут std::terminate. С другой стороны, исключение, возбуждаемое в последнем обращении к std::accumulate (10), не так опасно, потому что к этому моменту все потоки уже присоединились к вызывающему.

Таким образом, обращения к accumulate_block из новых потоков могут возбуждать исключения в точке (1). Так как блоки catch отсутствуют, то исключение останется необработанным и приведёт к вызову std::terminate() и завершению программы.

Если это еще не очевидно, скажем прямо: этот код не безопасен относительно исключений.

Делаем код безопасным относительно исключений

Итак, мы выявили все возможные точки возбуждения исключений и поняли, к каким печальным последствиям это приведёт. Что с этим можно сделать? Начнем с вопроса об исключениях, возбуждаемых в созданных нами потоках.

В главе 4 мы уже познакомились с подходящим для решения проблемы средством. Для чего нам вообще нужны новые потоки? Для того чтобы вычислить результат и при этом учесть возможность возникновения исключений. Но это именно то, для чего предназначено сочетание std::packaged_task и std::future. В листинге 8.3 показан код, переписанный с использованием std::packaged_task.

Листинг 8.3. Параллельная версия std::accumulate с применением std::packaged_task

template<typename Iterator, typename T>

struct accumulate_block {

 T operator()(Iterator first, Iterator last) {← (1)

  return std::accumulate(first, last, T());   ← (2)

 }

};

template<typename Iterator, typename T>

T parallel_accumulate(Iterator first, Iterator last, T init) {

 unsigned long const length = std::distance(first, last);

 if (!length)

  return init;

 unsigned long const min_per_thread = 25;

 unsigned long const max_threads =

  (length + min_per_thread — 1) / min_per_thread;

 unsigned long const hardware_threads =

  std::thread::hardware_concurrency();

 unsigned long const num_threads =

  std::min(

   hardware_threads i = 0 ? hardware_threads : 2, max_threads);

 unsigned long const block_size = length / num_threads;

 std::vector<std::future<T> > futures(num_threads-1);← (3)

 std::vector<std::thread> threads(num_threads — 1);

 Iterator block_start = first;

 for (unsigned long i = 0; i < (num_threads - 1); ++i) {

  Iterator block_end = block_start;

  std::advance(block_end, block_size);

  std::packaged_task<T(Iterator, Iterator)> task( ← (4)

   accumulate_block<Iterator, T>());

  futures[i] = task.get_future(); ← (5)

  threads[i] =

   std::thread(std::move(task), block_start, block_end);← (6)

  block_start = block_end;

 }

 T last_result = accumulate_block()(block_start, last); ← (7)

 std::for_each(threads.begin(), threads.end(),

  std::mem_fn(&std::thread::join));

 T result = init; ← (8)

 for (unsigned long i = 0; i < (num_threads - 1); ++i) {

  result += futures[i].get(); ← (9)

 }

 result += last_result; ← (10)

 return result;

}

Первое изменение заключается в том, что оператор вызова в accumulate_block теперь возвращает результат по значению, а не принимает ссылку на место, где его требуется сохранить (1). Для обеспечения безопасности относительно исключений мы используем std::packaged_task и std::future, поэтому можем воспользоваться этим и для передачи результата. Правда, для этого требуется при вызове std::accumulate (2) явно передавать сконструированный по умолчанию экземпляр T, а не использовать повторно предоставленное значение result, но это не слишком существенное изменение.

Далее, вместо того заводить вектор результатов, мы создаем вектор futures (3), в котором храним объекты std::future<T> для каждого запущенного потока. В цикле запуска потоков мы сначала создаем задачу для accumulate_block (4). В классе std::packaged_task<T(Iterator, Iterator)> объявлена задача, которая принимает два объекта Iterator и возвращает T, а это именно то, что делает наша функция. Затем мы получаем будущий результат для этой задачи (5) и исполняем ее в новом потоке, передавая начало и конец обрабатываемого блока (6). Результат работы задачи, равно как и исключение, если оно возникнет, запоминается в объекте future.

Поскольку используются будущие результаты, массива results больше нет, поэтому мы должны сохранить результат обработки последнего блока в переменной (7), а не в элементе массива. Кроме того, поскольку мы получаем значения из будущих результатов, проще не вызывать std::accumulate, а написать простой цикл for, в котором к переданному начальному значению (8) будут прибавляться значения, полученные из каждого будущего результата (9). Если какая-то задача возбудит исключение, то оно будет запомнено в будущем результате и повторно возбуждено при обращении к get(). Наконец, перед тем как возвращать окончательный результат вызывающей программе, мы прибавляем результат обработки последнего блока (10).

Таким образом, мы устранили одну из потенциальных проблем: исключения, возбужденные в рабочих потоках, повторно возбуждаются в главном. Если исключения возникнут в нескольких рабочих потоках, то вверх распространится только одно, но это не очень страшно. Если вы считаете, что это все-таки важно, то можете воспользоваться чем-то вроде класса std::nested_exception, чтобы собрать все такие исключения и передать их главному потоку.

Осталось решить проблему утечки потоков в случае, когда исключение возникает между моментом запуска первого потока и присоединением всех запущенных. Для этого проще всего перехватить любое исключение, дождаться присоединения потоков, которые все еще находятся в состоянии joinable(), а потом возбудить исключение повторно:

try {

 for (unsigned long i = 0; i < (num_threads - 1); ++i) {

  // ... как и раньше

 }

 T last_result = accumulate_block()(block_start, last);

 std::for_each(threads.begin(), threads.end(),

 std::mem_fn(&std::thread::join));

} catch (...) {

 for (unsigned long i = 0; i < (num_thread - 1); ++i) {

 if (threads[i].joinable())

  thread[i].join();

 }

 throw;

}

Теперь все работает. Все потоки будут присоединены вне зависимости от того, как завершилась обработка блока. Однако блоки try-catch выглядят некрасиво, и часть кода дублируется. Мы присоединяем потоки как в «нормальной» ветке, так и в блоке catch. Дублирование кода — вещь почти всегда нежелательная, потому что изменения придётся вносить в несколько мест. Давайте лучше перенесём этот код в деструктор — ведь именно такова идиома очистки ресурсов в С++. Вот как выглядит этот класс:

class join_threads {

 std::vector<std::thread>& threads;

public:

 explicit join_threads(std::vector<std::thread>& threads_):

  threads(threads_) {}

 ~join_threads() {

  for (unsigned long i = 0; i < threads.size(); ++i) {

   if (threads[i].joinable())

    threads[i].join();

  }

 }

};

Это похоже на класс thread_guard из листинга 2.3, только на этот раз мы «охраняем» целый вектор потоков. Теперь наш код упрощается.

Листинг 8.4. Безопасная относительно исключений версия std::accumulate

template<typename Iterator, typename T>

T parallel_accumulate(Iterator first, Iterator last, T init) {

 unsigned long const length = std::distance(first, last);

 if (!length)

  return init;

 unsigned long const min_per_thread = 25;

 unsigned long const max_threads =

  (length + min_per_thread - 1) / min_per_thread;

 unsigned long const hardware_threads =

  std::thread::hardware_concurrency();

 unsigned long const num_threads =

  std::min(

   hardware_threads i = 0 ? hardware_threads : 2, max_threads);

 unsigned long const block_size = length / num_threads;

 std::vector<std::future<T> > futures(num_threads — 1);

 std::vector<std::thread> threads(num_threads - 1);

 join_threads joiner(threads); ← (1)

 Iterator block_start = first;

 for (unsigned long i = 0; i < (num_threads - 1); ++i) {

  Iterator block_end = block_start;

  std::advance(block_end, block_size);

  std::packaged_task<T(Iterator, Iterator)> task(

   accumulate_block<Iterator, T>());

  futures[i] = task.get_future();

  threads[i] =

   std::thread(std:move(task), block_start, block_end);

  block_start = block_end;

 }

 T last_result = accumulate_block()(block_start, last);

 T result = init;

 for (unsigned long i = 0; i < (num_threads - 1); ++i) {

  result + = futures[i].get(); ← (2)

 }

 result += last_result;

 return result;

}

Создав контейнер потоков, мы создаем объект написанного выше класса (1), который присоединяет все завершившиеся потоки. Теперь явный цикл присоединения можно удалить, точно зная, что при выходе из функции потоки будут присоединены. Отметим, что вызовы futures[i].get() (2) приостанавливают выполнение программы до готовности результатов, поэтому в этой точке явно присоединять потоки не нужно. Этим данный код отличается от первоначальной версии в листинге 8.2, где присоединять потоки было необходимо для нрав ильного заполнения вектора results. Мало того что мы получили безопасный относительно исключений код, так еще и функция стала короче, потому что код присоединения вынесен в новый класс (который, кстати, можно использовать и в других программах).

Обеспечение безопасности относительно исключений при работе с std::async()

Теперь, когда мы знаем, что необходимо для обеспечения безопасности относительно исключений в программе, которая явно управляет потоками, посмотрим, как добиться того же результата при использовании std::async(). Мы уже видели, что в этом случае управление потоками берет на себя библиотека, а запущенный ей поток завершается, когда будущий результат готов. В плане безопасности относительно исключений важно то, что если уничтожить будущий результат, не дождавшись его, то деструктор будет ждать завершения потока. Тем самым мы элегантно решаем проблему утечки потоков, которые еще исполняются и хранят ссылки на данные. В следующем листинге показана безопасная относительно исключений реализация с применением std::async().

Листинг 8.5. Безопасная относительно исключений версия std::accumulate с применением std::async

template<typename Iterator, typename T>

T parallel_accumulate(Iterator first, Iterator last, T init) {

 unsigned long const length = std::distance(first, last);← (1)

 unsigned long const max_chunk_size = 25;

 if (length <= max_chunk_size) {

  return std::accumulate(first, last, init); ← (2)

 } else {

  Iterator mid_point = first;

  std::advance(mid_point, length / 2); ← (3)

  std::future<T> first_half_result =

   std::async(parallel_accumulate<Iterator, T>, ← (4)

   first, mid_point, init);

  T second_half_result =

   parallel_accumulate(mid_point, last, T());         ← (5)

  return first_half_result.get() + second_half_result;← (6)

 }

}

В этой версии мы распределяем данные рекурсивно, а не вычисляем распределение по блокам заранее, но в целом код намного проще предыдущей версии и при этом безопасен относительно исключений. Как и раньше, сначала мы вычисляем длину последовательности (1), и, если она меньше максимального размера блока, то вызываем std::accumulate напрямую (2). В противном случае находим среднюю точку последовательности (3) и запускаем асинхронную задачу, которая будет обрабатывать левую половину (4). Для обработки правой половины мы вызываем себя рекурсивно (5), а затем складываем результаты обработки обеих половин (6). Библиотека гарантирует, что std::async задействует имеющийся аппаратный параллелизм, не создавая слишком большого количества потоков. Некоторые «асинхронные» вызовы на самом деле будут исполняться синхронно при обращении к get() (6).

Изящество этого решения не только в том, что задействуется аппаратный параллелизм, но и в том, что безопасность относительно исключений обеспечивается тривиальным образом. Если рекурсивный вызов (5) возбудит исключение, то будущий результат, созданный при обращении к std::async (4), будет уничтожен при распространении исключения вверх по стеку. Это в свою очередь приведёт к ожиданию завершения асинхронного потока, так что «висячий поток» не образуется. С другой стороны, если исключение возбуждает асинхронный вызов, то оно будет запомнено в будущем результате и повторно возбуждено при обращении к get() (6).

Какие еще соображения следует принимать во внимание при проектировании параллельного кода? Давайте поговорим о масштабируемости. Насколько увеличится производительность программы, если запустить ее на машине с большим количеством процессоров?