4.4.1. Функциональное программирование с применением будущих результатов

Термином функциональное программирование (ФП) называют такой стиль программирования, при котором результат функции зависит только от ее параметров, но не от внешнего состояния. Это напрямую соотносится с понятием функции в математике и означает, что если два раза вызвать функцию с одними и теми же параметрами, то получатся одинаковые результаты. Таким свойством обладают многие математические функции в стандартной библиотеке С++, например sin, cos и sqrt, а также простые операции над примитивными типами, например 3+3, 6*9 или 1.3/4.7. Чистая функция не модифицирует никакое внешнее состояние, она воздействует только на возвращаемое значение.

При таком подходе становится проще рассуждать о функциях, особенно в присутствии параллелизма, поскольку многие связанные с разделяемой памятью проблемы, обсуждавшиеся в главе 3, просто не возникают. Если разделяемые данные не модифицируются, то не может быть никакой гонки и, стало быть, не нужно защищать данные с помощью мьютексов. Это упрощение настолько существенно, что в программировании параллельных систем все более популярны становятся такие языки, как Haskell[9], где все функции чистые по умолчанию. В таком окружении нечистые функции, которые все же модифицируют разделяемые данные, отчетливо выделяются, поэтому становится проще рассуждать о том, как они укладываются в общую структуру приложения.

Но достоинства функционального программирования проявляются не только в языках, где эта парадигма применяется по умолчанию. С++ — мультипарадигменный язык, и на нем, безусловно, можно писать программы в стиле ФП. С появлением в С++11 лямбда-функций (см. приложение А, раздел А.6), включением шаблона std::bind из Boost и TR1 и добавлением автоматического выведения типа переменных (см. приложение А, раздел А.7) это стало даже проще, чем в С++98. Будущие результаты — это последний элемент из тех, что позволяют реализовать на С++ параллелизм в стиле ФП; благодаря передаче будущих результатов результат одного вычисления можно сделать зависящим от результата другого без явного доступа к разделяемым данным.

Быстрая сортировка в духе ФП

Чтобы продемонстрировать использование будущих результатов при написании параллельных программ в духе ФП, рассмотрим простую реализацию алгоритма быстрой сортировки Quicksort. Основная идея алгоритма проста: имея список значений, выбрать некий опорный элемент и разбить список на две части — в одну войдут элементы, меньшие опорного, в другую — большие или равные опорному. Отсортированный список получается путем сортировки обоих частей и объединения трех списков: отсортированного множества элементов, меньших опорного элемента, самого опорного элемента и отсортированного множества элементов, больших или равных опорному элементу. На рис. 4.2 показано, как этот алгоритм сортирует список из 10 целых чисел. В листинге ниже приведена последовательная реализация алгоритма в духе ФП; в ней список принимается и возвращается по значению, а не сортируется по месту в std::sort().

Рис. 4.2. Рекурсивная сортировка в духе ФП

Листинг 4.12. Последовательная реализация Quicksort в духе ФП

template<typename T>

std::list<T> sequential_quick_sort(std::list<T> input) {

 if (input.empty()) {

  return input;

 }

 std::list<T> result;

 result.splice(result.begin(), input, input.begin());← (1)

 T const& pivot = *result.begin();                   ← (2)

 auto divide_point = std::partition(input.begin(), input.end(),

  [&](T const& t) { return t < pivot; });← (3)

 std::list<T> lower_part;

 lower_part.splice(

  lower_part.end(), input, input.begin(), divide_point); ← (4)

 auto new_lower(

  sequential_quick_sort(std::move(lower_part)));         ← (5)

 auto new_higher(

  sequential_quick_sort(std::move(input)));              ← (6)

 result.splice(result.end(), new_higher);  ← (7)

 result.splice(result.begin(), new_lower); ← (8)

 return result;

}

Хотя интерфейс выдержан в духе ФП, прямое применение ФП привело бы к неоправданно большому числу операций копирования, поэтому внутри мы используем «обычный» императивный стиль. В качестве опорного мы выбираем первый элемент и отрезаем его от списка с помощью функции splice() (1). Потенциально это может привести к неоптимальной сортировке (в терминах количества операций сравнения и обмена), но любой другой подход при работе с std::list может существенно увеличить время за счет обхода списка. Мы знаем, что этот элемент должен войти в результат, поэтому можем сразу поместить его в список, где результат будет храниться. Далее мы хотим использовать этот элемент для сравнения, поэтому берем ссылку на него, чтобы избежать копирования (2). Теперь можно с помощью алгоритма std::partition разбить последовательность на две части: меньшие опорного элемента и не меньшие опорного элемента (3). Критерий разбиения проще всего задать с помощью лямбда-функции; мы запоминаем ссылку в замыкании, чтобы не копировать значение pivot (подробнее о лямбда-функциях см. в разделе А.5 приложения А).

Алгоритм std::partition() переупорядочивает список на месте и возвращает итератор, указывающий на первый элемент, который не меньше опорного значения. Полный тип итератора довольно длинный, поэтому мы используем спецификатор типа auto, чтобы компилятор вывел его самостоятельно (см. приложение А, раздел А.7).

Раз уж мы выбрали интерфейс в духе ФП, то для рекурсивной сортировки обеих «половин» нужно создать два списка. Для этого мы снова используем функцию splice(), чтобы переместить значения из списка input до divide_point включительно в новый список lower_part (4). После этого input будет со держать только оставшиеся значения. Далее оба списка можно отсортировать путем рекурсивных вызовов (5), (6). Применяя std::move() для передачи списков, мы избегаем копирования — результат в любом случае неявно перемещается. Наконец, мы еще раз вызываем splice(), чтобы собрать result в правильном порядке. Значения из списка new_higher попадают в конец списка (7), после опорного элемента, а значения из списка new_lower — в начало списка, до опорного элемента (8).

Параллельная реализация Quicksort в духе ФП

Раз уж мы все равно применили функциональный стиль программирования, можно без труда распараллелить этот код с помощью будущих результатов, как показано в листинге ниже. Набор операций тот же, что и раньше, только некоторые из них выполняются параллельно.

Листинг 4.13. Параллельная реализация Quicksort с применением будущих результатов

template<typename T>

std::list<T> parallel_quick_sort(std::list<T> input) {

 if (input.empty()) {

  return input;

 }

 std::list<T> result;

 result.splice(result.begin(), input, input.begin());

 T const& pivot = *result.begin();

 auto divide_point = std::partition(input.begin(), input.end(),

  [&](T const& t) {return t<pivot;});

 std::list<T> lower_part;

 lower_part.splice(

  lower_part.end(), input, input.begin(), divide_point);

 std::future<std::list<T> > new_lower( ← (1)

  std::async(&parallel_quick_sort<T>, std::move(lower_part)));

 auto new_higher(

  parallel_quick_sort(std::move(input))); ← (2)

 result.splice(result.end(), new_higher); ← (3)

 result.splice(result.begin(), new_lower.get()); ← (4)

 return result;

}

Существенное изменение здесь заключается в том, что сортировка нижней части списка производится не в текущем, а в отдельном потоке — с помощью std::async() (1). Верхняя часть списка сортируется путем прямой рекурсии, как и раньше (2). Рекурсивно вызывая parallel_quick_sort(), мы можем задействовать доступный аппаратный параллелизм. Если std::async() создает новый поток при каждом обращении, то после трех уровней рекурсии мы получим восемь работающих потоков, а после 10 уровней (когда в списке примерно 1000 элементов) будет работать 1024 потока, если оборудование позволяет. Если библиотека решит, что запущено слишком много задач (быть может, потому что количество задач превысило уровень аппаратного параллелизма), то может перейти в режим синхронного запуска новых задач. Тогда новая задача будет работать в том же потоке, который обратился к get(), а не в новом, так что мы не будем нести издержки на передачу задачи новому потоку, если это не увеличивает производительность. Стоит отметить, что в соответствии со стандартом реализация std::async вправе как создавать новый поток для каждой задачи (даже при значительном превышении лимита), если явно не задан флаг std::launch::deferred, так и запускать все задачи синхронно, если явно не задан флаг std::launch::async. Рассчитывая, что библиотека сама позаботится об автоматическом масштабировании, изучите, что говорится на эту тему в документации, поставляемой вместе с библиотекой.

Можно не использовать std::async(), а написать свою функцию spawn_task(), которая будет служить оберткой вокруг std::packaged_task и std::thread, как показано в листинге 4.14; нужно создать объект std::packaged_task для хранения результата вызова функции, получить из него будущий результат, запустить задачу в отдельном потоке и вернуть будущий результат. Само по себе это не дает большого преимущества (и, скорее всего, приведёт к значительному превышению лимита), но пролагает дорогу к переходу на более хитроумную реализацию, которая помещает задачу в очередь, обслуживаемую пулом потоков. Рассмотрение пулов потоков мы отложим до главы 9. Но идти по такому пути вместо использования std::async имеет смысл только в том случае, когда вы точно знаете, что делаете, и хотите полностью контролировать, как пул потоков строится и выполняет задачи.

Но вернемся к функции parallel_quick_sort. Поскольку для получения new_higher мы применяли прямую рекурсию, то и срастить (splice) его можно на месте, как и раньше (3). Но new_lower теперь представляет собой не список, а объект std::future<std::list<T>>, поэтому сначала нужно извлечь значение с помощью get(), а только потом вызывать splice() (4). Таким образом, мы дождемся завершения фоновой задачи, а затем переместим результат в параметр splice(); функция get() возвращает ссылку на r-значение — хранимый результат, следовательно, его можно переместить (подробнее о ссылках на r-значения и семантике перемещения см. в разделе А.1.1 приложения А).

Даже в предположении, что std::async() оптимально использует доступный аппаратный параллелизм, приведённая реализация Quicksort все равно не идеальна. Основная проблема в том, что std::partition делает много работы и остается последовательной операцией, но пока остановимся на этом. Если вас интересует максимально быстрая параллельная реализация, обратитесь к научной литературе.

Листинг 4.14. Простая реализация функции spawn_task

template<typename F, typename A>

std::future<std::result_of<F(A&&)>::type>

spawn_task(F&& f, A&& a) {

 typedef std::result_of<F(A&&)>::type result_type;

 std::packaged_task<result_type(A&&)>

 task(std::move(f)));

 std::future<result_type> res(task.get_future());

 std::thread t(std::move(task), std::move(a));

 t.detach();

 return res;

}

Функциональное программирование — не единственная парадигма параллельного программирования, позволяющая избежать модификации разделяемых данных. Альтернативой является парадигма CSP (Communicating Sequential Processes — взаимодействующие последовательные процессы)[10], в которой потоки концептуально рассматриваются как полностью независимые сущности, без каких бы то ни было разделяемых данных, но соединенные коммуникационными каналами, по которым передаются сообщения. Эта парадигма положена в основу языка программирования Erlang (http://www.erlang.org/) и среды MPI (Message Passing Interface) (http://www.mpi-forum.org/), широко используемой для высокопроизводительных вычислений на С и С++. Уверен, что теперь вы не удивитесь, узнав, что и эту парадигму можно поддержать на С++, если соблюдать определенную дисциплину; в следующем разделе показано, как это можно сделать.