8.1.2. Рекурсивное распределение данных

We use cookies. Read the Privacy and Cookie Policy

Алгоритм Quicksort состоит из двух шагов: разбиение данных на две части — до и после опорного элемента в смысле требуемого упорядочения, и рекурсивная сортировка обеих «половин». Невозможно распараллелить этот алгоритм, разбив данные заранее, потому что состав каждой «половины» становится известен только в процессе обработки элементов. Поэтому распараллеливание по необходимости должно быть рекурсивным. На каждом уровне рекурсии производится больше вызовов функции quick_sort, потому что предстоит отсортировать элементы, меньшие опорного, и большие опорного. Эти рекурсивные вызовы не зависят друг от друга, так как обращаются к разным элементам, поэтому являются идеальными кандидатами для параллельного выполнения. На рис. 8.2 изображено такое рекурсивное разбиение.

Рис. 8.2. Рекурсивное разбиение данных

В главе 4 была приведена подобная реализация. Вместо того чтобы просто вызывать функцию рекурсивно для большей и меньшей «половины», мы с помощью std::async() на каждом шаге запускали асинхронную задачу для меньшей половины. Вызывая std::async(), мы просим стандартную библиотеку С++ самостоятельно решить, имеет ли смысл действительно выполнять задачу в новом потоке или лучше сделать это синхронно.

Это существенно: при сортировке большого набора данных запуск нового потока для каждого рекурсивного вызова быстро приводит к образованию чрезмерного количества потоков. Как мы увидим ниже, когда потоков слишком много, производительность может не возрасти, а наоборот упасть. Кроме того, если набор данных очень велик, то потоков может просто не хватить. Сама идея рекурсивного разбиения на задачи хороша, нужно только строже контролировать число запущенных потоков. В простых случаях с этим справляется std::async(), но есть и другие варианты.

Один из них — воспользоваться функцией std::thread::hardware_concurrency() для выбора нужного числа потоков, как мы делали в параллельной версии accumulate() из листинга 2.8. Тогда вместо того чтобы запускать новый поток для каждого рекурсивного вызова, мы можем просто поместить подлежащий сортировке блок данных в потокобезопасный стек типа того, что был описан в главах 6 и 7. Если потоку больше нечего делать — потому что он закончил обработку всех своих блоков или потому что ждет, когда необходимый ему блок будет отсортирован, то он может взять блок из стека и заняться его сортировкой.

В следующем листинге показана реализация этой идеи.

Листинг 8.1. Параллельный алгоритм Quicksort с применением стека блоков, ожидающих сортировки

template<typename T>

struct sorter { ← (1)

 struct chunk_to_sort {

  std::list<T> data;

  std::promise<std::list<T> > promise;

 };

 thread_safe_stack<chunk_to_sort> chunks; ← (2)

 std::vector<std::thread> threads;        ← (3)

 unsigned const max_thread_count;

 std::atomic<bool> end_of_data;

 sorter():

  max_thread_count(std::thread::hardware_concurrency() - 1),

  end_of_data(false) {}

 ~sorter() {          ← (4)

  end_of_data = true; ← (5)

  for (unsigned i = 0; i < threads.size(); ++i) {

   threads[i].join(); ← (6)

  }

 }

 void try_sort_chunk() {

  boost::shared_ptr<chunk_to_sort> chunk = chunks.pop();← (7)

  if (chunk) {

   sort_chunk(chunk); ← (8)

  }

 }

 std::list<T> do_sort(std::list<T>& chunk_data) { ← (9)

  if (chunk_data.empty()) {

   return chunk_data;

  }

  std::list<T> result;

  result.splice(result.begin(), chunk_data, chunk_data.begin());

  T const& partition_val = *result.begin();

  typename std::list<T>::iterator divide_point = ← (10)

   std::partition(chunk_data.begin(), chunk_data.end(),

    [&](T const& val) {return val < partition_val; });

  chunk_to_sort new_lower_chunk;

  new_lower_chunk.data.splice(new_lower_chunk.data.end(),

   chunk_data, chunk_data.begin(),

   divide_point);

  std::future<std::list<T> > new_lower =

   new_lower_chunk.promise.get_future();

  chunks.push(std::move(new_lower_chunk)); ← (11)

  if (threads.size() < max_thread_count) { ← (12)

   threads.push_back(std::thread(&sorter<T>::sort_thread, this));

  }

  std::list<T> new_higher(do_sort(chunk_data));

  result.splice(result.end(), new_higher);

  while (new_lower.wait_for(std::chrono::seconds(0)) !=

   std::future_status::ready) { ← (13)

   try_sort_chunk();            ← (14)

  }

  result.splice(result.begin(), new_lower.get());

  return result;

 }

 void sort_chunk(boost::shared_ptr<chunk_to_sort> const& chunk) {

  chunk->promise.set_value(do_sort(chunk->data));← (15)

 }

 void sort_thread() {

  while (!end_of_data) {     ← (16)

   try_sort_chunk();         ← (17)

   std::this_thread::yield();← (18)

  }

 }

};

template<typename T>

std::list<T> parallel_quick_sort(std::list<T> input) { ← (19)

 if (input.empty()) {

  return input;

 }

 sorter<T> s;

 return s.do_sort(input); ← (20)

}

Здесь функция parallel_quick_sort (19) делегирует большую часть работы классу sorter (1), который объединяет стек неотсортированных блоков (2) с множеством потоков (3). Основные действия производятся в функции-члене do_sort (9), которая занимается обычным разбиением данных на две части (10). Но на этот раз она не запускает новый поток для каждого блока, а помещает его в стек (11) и запускает новый поток, только если еще есть незанятые процессоры (12). Поскольку меньший блок, возможно, обрабатывается другим потоком, нам необходимо дождаться его готовности (13). Чтобы не простаивать зря (в том случае, когда данный поток единственный или все остальные уже заняты), мы пытаемся обработать блок, находящийся в стеке (14). Функция try_sort_chunk извлекает поток из стека (7), сортирует его (8) и сохраняет результаты в обещании promise, так чтобы их смог получить поток, который поместил в стек данный блок (15).

Запущенные потоки крутятся в цикле, пытаясь отсортировать блоки, находящиеся в стеке (17), ожидая, пока будет установлен флаг end_of_data (16). В промежутке между проверками они уступают процессор другим потокам (18), чтобы у тех был шанс поместить в стек новые блоки. Эта программа полагается на то, что деструктор класса sorter (4) разберется с запущенными потоками. Когда все данные будут отсортированы, do_sort вернет управление (хотя рабочие потоки все еще выполняются), так что главный поток вернется из parallel_quick_sort (20) и, стало быть, уничтожит объект sorter. В этот момент деструктор поднимет флаг end_of_data (5) и дождется завершения рабочих потоков (6). Поднятый флаг является для функции потока указанием выйти из цикла (16).

При таком подходе не возникает проблемы неограниченного количества потоков, с которой мы сталкивались, когда spawn_task каждый раз запускает новый поток. С другой стороны, мы не полагаемся на то, что стандартная библиотека С++ выберет количество рабочих потоков за нас, как то происходит при использовании std::async(). Мы сами ограничиваем число потоков значением std::thread::hardware_concurrency(), чтобы избежать чрезмерно большого количества контекстных переключений. Однако же взамен нас поджидает другая потенциальная проблема: управление потоками и взаимодействие между ними сильно усложняют код. Кроме того, хотя потоки и обрабатывают разные элементы данных, все они обращаются к стеку для добавления новых блоков и извлечения блоков для сортировки. Из-за этой острой конкуренции производительность может снизиться, пусть даже мы используем свободный от блокировок (и, значит, неблокирующий) стек. Почему так происходит, мы увидим чуть ниже.

Этот подход представляет собой специализированную версию пула потоков — существует набор потоков, которые получают задачи из списка ожидающих работ, выполняют их, а затем снова обращаются к списку. Некоторые потенциальные проблемы, свойственные пулам потоков (в том числе конкуренция за список работ), и способы их решения рассматриваются в главе 9. Задача масштабирования приложения на несколько процессоров более детально обсуждается в этой главе ниже (см. раздел 8.2.1).

Как при предварительном, так и при рекурсивном распределении данных мы предполагаем, что сами данные фиксированы заранее, а нам нужно лишь найти удачный способ их разбиения. Но так бывает не всегда; если данные порождаются динамически или поступают из внешнего источника, то такой подход не годится. В этом случае имеет смысл распределять работу по типам задач, а не по данным.