8.5.2. Параллельная реализация std::find

Далее будет полезно рассмотреть алгоритм std::find, потому что это один из нескольких алгоритмов, которые могут завершаться еще до того, как обработаны все элементы. Например, если уже первый элемент в диапазоне отвечает условию поиска, то рассматривать остальные не имеет смысла. Как мы скоро увидим, это свойство существенно для повышения производительности, и от него напрямую зависит структура параллельной реализации. На этом примере мы продемонстрируем, как порядок доступа к данным может оказать влияние на проектирование программы (см. раздел 8.3.2). К той же категории относятся алгоритмы std::equal и std::any_of.

Если вы вместе с женой или другом ищете какую-нибудь старую фотографию в сваленных на чердаке альбомах, то вряд ли захотите, чтобы они продолжали перелистывать страницы, когда вы уже нашли то, что нужно. Наверное, вы просто сообщите им, что искомое найдено (быть может, крикнув «Есть!»), чтобы они могли прекратить поиски и заняться чем-нибудь другим. Но многие алгоритмы по природе своей должны обработать каждый элемент и, стало быть, не имеют эквивалента восклицанию «Есть!». Для алгоритмов типа std::find умение «досрочно» прекращать работу — важное свойство, которое нельзя игнорировать. И, следовательно, его нужно учитывать при проектировании кода — закладывать возможность прерывания других задач, когда ответ уже известен, чтобы программа не ждала, пока прочие рабочие потоки обработают оставшиеся элементы.

Без прерывания других потоков последовательная версия может работать быстрее параллельной, потому что прекратит поиск, как только будет найден нужный элемент. Если, например, система поддерживает четыре параллельных потока, то каждый из них должен будет просмотреть четверть полного диапазона, поэтому при наивном распараллеливании каждый поток потратит на просмотр своих элементов четверть всего времени. Если искомый элемент окажется в первой четверти диапазона, то последовательный алгоритм завершится раньше, так как не должен будет просматривать оставшиеся элементы.

Один из способов прервать другие потоки — воспользоваться атомарной переменной в качестве флага и проверять его после обработки каждого элемента. Если флаг поднят, то один из потоков уже нашел нужный элемент, поэтому можно прекращать обработку. При таком способе прерывания мы не обязаны обрабатывать все элементы, и, значит, параллельная версия чаще будет обгонять последовательную. Недостаток состоит в том, что загрузка атомарной переменной — медленная операция, которая тормозит работу каждого потока.

Мы уже знаем о двух способах возврата значений и распространения исключений. Можно использовать массив будущих результатов и объекты std::packaged_task для передачи значений и исключений, после чего обработать частичные результаты в главном потоке. Или с помощью std::promise устанавливать окончательный результат прямо в рабочем потоке. Все зависит от того, как мы хотим обрабатывать исключения, возникающие в рабочих потоках. Если требуется остановиться при первом возникновении исключения (несмотря на то, что обработаны не все элементы), то можно использовать std::promise для передачи значения и исключения. С другой стороны, если мы хотим, чтобы рабочие потоки продолжали поиск, то используем std::packaged_task, сохраняем все исключения, а затем повторно возбуждаем одно из них, если искомый элемент не найден.

В данном случае я остановился на std::promise, потому что такое поведение больше походит на поведение std::find. Надо только не забыть о случае, когда искомого элемента в указанном диапазоне нет. Поэтому необходимо дождаться завершения всех потоков перед тем, как получать значение из будущего результата. Если просто блокировать исполнение при обращении к get(), то при условии, что искомого элемента нет, мы будем ждать вечно. Получившийся код приведён ниже.

Листинг 8.9. Параллельная реализация алгоритма find()

template<typename Iterator, typename MatchType>

Iterator parallel_find(Iterator first, Iterator last,

 MatchType match) {

 struct find_element { ← (1)

  void operator()(Iterator begin, Iterator end,

   MatchType match,

   std::promise<Iterator>* result,

   std::atomic<bool>* done_flag) {

   try {

    for(; (begin != end) && !done_flag->load(); ++begin) {← (2)

     if (*begin == match) {

      result->set_value(begin);← (3)

      done_flag->store(true);  ← (4)

      return;

     }

    }

   } catch (...) { ← (5)

    try {

     result->set_exception(std::current_exception());← (6)

     done_flag->store(true);

    } catch (...) ← (7)

    {}

   }

  }

 };

 unsigned long const length = std::distance(first, last);

 if (!length)

  return last;

 unsigned long const min_per_thread = 25;

 unsigned long const max_threads =

  (length + min_per_thread — 1) / min_per_thread;

 unsigned long const hardware_threads =

  std::thread::hardware_concurrency();

 unsigned long const num_threads =

  std::min(

   hardware_threads != 0 ? hardware_threads : 2, max_threads);

 unsigned long const block_size = length / num_threads;

 std::promise<Iterator> result;     ← (8)

 std::atomic<bool> done_flag(false);← (9)

 std::vector<std::thread> threads(num_threads — 1); {← (10)

  join_threads joiner(threads);

 Iterator block_start = first;

 for (unsigned long i = 0; i < (num_threads - 1); ++i) {

  Iterator block_end = block_start;

  std::advance(block_end, block_size);

  threads[i] = std::thread(find_element(), ← (11)

   block_start, block_end, match,

   &result, &done_flag);

  block_start = block_end;

 }

 find_element()(

  block_start, last, match, &result, &done_flag);← (12)

 if (!done_flag.load()) { ← (13)

  return last;

 }

 return result.get_future().get() (14)

}

В основе своей код в листинге 8.9 похож на предыдущие примеры. На этот раз вся работа производится в операторе вызова, определенном в локальном классе find_element (1). Здесь мы в цикле обходим элементы из назначенного потоку блока, проверяя флаг на каждой итерации (2). Если искомый элемент найден, то мы записываем окончательный результат в объект-обещание (3) и перед возвратом устанавливаем флаг done_flag (4).

Если было возбуждено исключение, то его перехватит универсальный обработчик (5) и попытается сохранить исключение в обещании (6) перед установкой done_flag. Но установка значения объекта-обещания может возбудить исключение, если значение уже установлено, поэтому мы перехватываем и игнорируем любые возникающие здесь исключения (7).

Это означает, что если поток, вызвавший find_element, найдет искомый элемент или возбудит исключение, то все остальные потоки увидят поднятый флаг done_flag и прекратят работу. Если несколько потоков одновременно найдут искомое или возбудят исключение, то возникнет гонка за установку результата в обещании. Но это безобидная гонка: победивший поток считается «первым», и установленный им результат приемлем.

В самой функции parallel_find мы определяем обещание (8) и флаг прекращения поиска (9), которые передаем новым потокам вместе с диапазоном для просмотра (11). Кроме того, главный поток пользуется классом find_element для поиска среди оставшихся элементов (12). Как уже отмечалось, мы должны дождаться завершения всех потоков перед тем, как проверять результат, потому что искомого элемента может вообще не оказаться. Для этого мы заключаем код запуска и присоединения потоков в блок (10), так что к моменту проверки флага (13) все потоки гарантировано присоединены. Если элемент был найден, то, обратившись к функции get() объекта std::future<Iterator>, мы либо получим результат из обещания, либо возбудим сохраненное исключение.

Как и раньше, в этой реализации предполагается, что мы собираемся использовать все доступные аппаратные потоки или располагаем каким-то механизмом, который позволит заранее определить количество потоков для предварительного разделения между ними работы. И снова мы можем упростить код, воспользовавшись функцией std::async и рекурсивным разбиением данных, если готовы принять автоматический механизм масштабирования, скрытый в стандартной библиотеке С++. Реализация parallel_find с применением std::async приведена в листинге ниже.

Листинг 8.10. Параллельная реализация алгоритма find() с применением std::async

template<typename Iterator, typename MatchType> ← (1)

Iterator parallel_find_impl(Iterator first, Iterator last,

 MatchType match,

 std::atomic<bool>& done) {

 try {

  unsigned long const length = std::distance(first, last);

  unsigned long const min_per_thread = 25;          ← (2)

  if (length < (2 * min_per_thread)) {              ← (3)

   for(; (first != last) && !done.load(); ++first) {← (4)

    if (*first == match) {

     done = true;                                   ← (5)

     return first;

    }

   }

   return last; ← (6)

  } else {

   Iterator const mid_point = first + (length / 2);    ← (7)

   std::future<Iterator> async_result =

    std::async(&parallel_find_impl<Iterator, MatchType>,← (8)

   mid_point, last, match, std::ref(done));

   Iterator const direct_result =

    parallel_find_impl(first, mid_point, match, done);  ← (9)

   return (direct_result == mid_point) ?

    async_result.get() : direct_result; ← (10)

  }

 } catch (...) {

  done = true; ← (11)

  throw;

 }

}

template<typename Iterator, typename MatchType>

Iterator parallel_find(

 Iterator first, Iterator last, MatchType match) {

 std::atomic<bool> done(false);

 return parallel_find_impl(first, last, match, done); ← (12)

}

Желание закончить поиск досрочно при обнаружении совпадения заставило нас ввести флаг, разделяемый между всеми потоками. Этот флаг, следовательно, нужно передавать во все рекурсивные вызовы. Проще всего сделать это, делегировав работу отдельной функции (1), которая принимает дополнительный параметр — ссылку на флаг done, передаваемый из главной точки входа (12).

Основная же ветвь кода не таит никаких неожиданностей. Как и во многих предыдущих реализациях, мы задаем минимальное количество элементов, обрабатываемых в одном потоке (2); если размер обеих половин диапазона меньше этой величины, то весь диапазон обрабатывается в текущем потоке (3). Собственно алгоритм сводится к простому циклу — он продолжается, пока не будет достигнут конец заданного диапазона или не установлен флаг done (4). При обнаружении совпадения мы устанавливаем флаг done и выходим из функции (5). Если мы дошли до конца списка или вышли из цикла, потому что другой поток установил флаг done, то возвращаем значение last, означающее, что совпадение не найдено (6).

Если диапазон можно разбивать, то мы сначала находим среднюю точку (7), а потом через std::async запускаем поиск во второй половине диапазона (8), не забыв передать ссылку на флаг done с помощью std::ref. Одновременно мы просматриваем первую половину диапазона, рекурсивно вызывая себя же (9). И асинхронный, и рекурсивный вызов могут разбивать диапазон и дальше, если он достаточно велик.

Если прямой рекурсивный вызов вернул mid_point, значит, он не нашел совпадения, поэтому нужно получить результат асинхронного поиска. Если и в той половине ничего не было найдено, то мы получим last (10). Если «асинхронный» вызов на самом деле был не асинхронным, а отложенным, то выполняться он начнет именно при обращении к get(); в таком случае поиск во второй половине списке вообще не будет производиться, если поиск в первой оказался успешным. Если же асинхронный поиск действительно выполнялся в другом потоке, то деструктор переменной async_result будет ждать завершения этого потока, поэтому утечки потоков не произойдет.

Как и раньше, применение std::async гарантирует безопасность относительно исключений и распространения исключений вверх по стеку вызовов. Если прямой рекурсивный вызов возбудит исключение, то деструктор будущего результата позаботится о том, чтобы поток, в котором работал асинхронный поиск, завершился до возврата из функции. Если исключение возбудит асинхронный вызов, то оно распространится вверх при вызове get() (10). Внешний блок try/catch нужен только для того, чтобы установить флаг done и обеспечить тем самым быстрое завершение всех потоков в случае исключения (11). Программа правильно работала бы и без этого, по продолжала бы сравнивать элементы до естественного завершения всех потоков.

Существенной особенностью обеих реализаций этого алгоритма (характерной и для других рассмотренных выше параллельных алгоритмов) является тот факт, что элементы могут обрабатываться не в том же порядке, что в стандартном алгоритме std::find. Это важный момент при распараллеливании любого алгоритма. Если порядок имеет значение, то обрабатывать элементы параллельно нельзя. В таких алгоритмах, как parallel_for_each, порядок обработки независимых элементов не важен, однако parallel_find может вернуть элемент, найденный где-то в конце диапазона, хотя имеется другой такой же элемент в начале. Это может оказаться неприятной неожиданностью.

Итак, нам удалось распараллелить std::find. В начале этого раздела я говорил, что существуют и другие алгоритмы, которые могут завершаться раньше, чем будут обработаны все элементы. К ним применима такая же техника. В главе 9 мы еще вернёмся к вопросу о прерывании потоков.

В последнем из трех примеров мы направимся в другую сторону и рассмотрим алгоритм std::partial_sum. Он не очень широко известен, но интересен с точки зрения распараллеливания, поскольку позволяет проиллюстрировать некоторые дополнительные проектные решения.