7.2.1. Потокобезопасный стек без блокировок

Основное свойство стека понятно: элементы извлекаются в порядке, обратном тому, в котором помещались — последним пришёл, первым ушел (LIFO). Поэтому важно убедиться, что после добавления значения в стек оно может быть сразу же безопасно извлечено другим потоком и что только один поток получает данное значение. Простейшая реализация стека основана на связанном списке; указатель head направлен на первый узел (который будет извлечен следующим), и каждый узел указывает на следующий в списке. При такой схеме добавление узла реализуется просто.

1. Создать новый узел.

2. Записать в его указатель next текущее значение head.

3. Записать в head указатель на новый узел.

Все это прекрасно работает в однопоточной программе, но, когда стек могут модифицировать сразу несколько потоков, этого недостаточно. Существенно, что если узлы добавляют два потока, то между шагами 2 и 3 возможна гонка: второй поток может модифицировать значение head после того, как первый прочитает его на шаге 2, но до изменения на шаге 3. В таком случае изменения, произведенные вторым потоком, будут отброшены или случится еще что-нибудь похуже. Прежде чем решать эту проблему, следует отметить, что после того, как указатель head будет изменен и станет указывать на новый узел, этот узел может быть прочитан другим потоком. Поэтому крайне важно, чтобы новый узел был аккуратно подготовлен до того, как на него начнет указывать head; потом изменять узел уже нельзя.

Ну хорошо, а как все-таки быть с этим неприятным состоянием гонки? Ответ таков — использовать атомарную операцию сравнить-и-обменять на шаге 3, гарантирующую, что head не был модифицирован с момента чтения на шаге 2. Если был, то следует вернуться в начало цикла и повторить. В листинге ниже показано, как можно реализовать потокобезопасную функцию push() без блокировок.

Листинг 7.2. Реализация функции push() без блокировок

template<typename T>

class lock_free_stack {

private:

 struct node {

  T data;

  node* next;

  node(T const& data_) : ← (1)

   data(data_) {}

 };

 std::atomic<node*> head;

public:

 void push(T const& data) {

  node* const new_node = new node(data);← (2)

  new_node->next = head.load();         ← (3)

  while (!head.compare_exchange_weak(

          new_node->next, new_node));   ← (4)

 }

};

В этом коде дотошно реализованы все три пункта изложенного выше плана: создать новый узел (2), записать в его поле next текущее значение head (3) и записать в head указатель на новый узел (4). Заполнив данные самой структуры node в конструкторе (1), мы гарантируем, что узел готов к использованию сразу после конструирования, так что легкая проблема решена. Затем мы вызываем функцию compare_exchange_weak(), которая проверяет, что указатель head по-прежнему содержит то значение, которое было сохранено в new_node->next (3), и, если это так, то записывает его в new_node. В этой части программы используется также полезное свойство сравнения с обменом: если функция возвращает false, означающее, что сравнение не прошло (например, потому что значение head было изменено другим потоком), то в переменную, которая передана в первом параметре (new_node->next) записывается текущее значение head. Поэтому нам не нужно перезагружать head на каждой итерации цикла — это сделает за нас компилятор. Кроме того, поскольку мы сразу переходим в начало цикла в случае неудачного сравнения, можно использовать функцию compare_exchange_weak, которая в некоторых архитектурах дает более оптимальный код, чем compare_exchange_strong (см. главу 5).

Итак, операции pop() у нас пока еще нет, но уже можно сверить реализацию push() с рекомендациями. Единственное место, где возможны исключения, — конструирование нового узла (1), но здесь все будет подчищено автоматически, и, поскольку список еще не модифицирован, то опасности нет. Поскольку мы сами строим данные, сохраняемые в узле node, и используем compare_exchange_weak() для обновления указателя head, то проблематичных состояний гонки здесь нет. Если операция сравнения с обменом завершилась успешно, то узел находится в списке, и его можно извлекать. Так как нет никаких блокировок, то нет и возможности взаимоблокировки, и, стало быть, функция push() успешно сдала экзамен.

Теперь, когда у нас есть средства для добавления данных в стек, надо научиться их извлекать обратно. На первый взгляд, тут всё просто.

1. Прочитать текущее значение head.

2. Прочитать head->next.

3. Записать в head значение head->next.

4. Вернуть поле data, хранящееся в извлеченном узле node.

5. Удалить извлеченный узел.

Однако наличие нескольких потоков осложняет дело. Если два потока пытаются удалить элементы из стека, то оба могут прочитать одно и то же значение head на шаге 1. Если затем один поток успеет выполнить все операции вплоть до шага 5, прежде чем другой доберется до шага 2, то второй поток попробует разыменовать висячий указатель. Это одна из самых серьезных проблем при написании кода, свободного от блокировок, поэтому пока мы просто опустим шаг 5, смирившись с утечкой узлов.

Однако на этом трудности не кончаются. Есть еще одна проблема: если два потока прочитают одно и то же значение head, то они вернут один и тот же узел. Это вступает в противоречие с самой идеей стека, поэтому должно быть предотвращено любой ценой. Решить проблему можно так же, как мы устранили гонку в push(): использовать для обновления head операцию сравнения с обменом. Если она завершается с ошибкой, значит, либо в промежутке был добавлен новый узел, либо другой поток только что извлек узел, который собирались извлечь мы. В любом случае нужно вернуться на шаг 1 (хотя операция сравнения с обменом автоматически перечитывает head).

Если сравнение с обменом завершилось успешно, то мы точно знаем, что больше ни один поток не пытался удалить данный узел из стека, поэтому можем без опаски выполнить шаг 4. Вот первая попытка написать код pop():

template<typename T>

class lock_free_stack {

public:

 void pop(T& result) {

  node* old_head = head.load();

  while (!head.compare_exchange_weak(old_head, old_head->next));

  result = old_head->data;

 }

};

Вроде бы всё красиво и лаконично, но, помимо утечки узлов, осталось еще две проблемы. Во-первых, этот код не работает для пустого списка: если указатель head нулевой, то при попытке прочитать next мы получим неопределённое поведение. Это легко исправить, сравнивая в цикле while значение head с nullptr: если стек оказался пуст, мы можем либо возбудить исключение, либо вернуть булевский индикатор успеха или ошибки.

Вторая проблема касается безопасности относительно исключений. Впервые подступаясь к потокобезопасному стеку в главе 3, мы видели, что простой возврат объекта по значению небезопасен относительно исключений: если исключение возникает во время копирования возвращаемого значения, то значение будет потеряно. Тогда передача ссылки на результат оказалась приемлемым решением, которое гарантировало неизменность стека в случае исключения. К сожалению, сейчас мы лишены такой роскоши; безопасно скопировать данные можно только тогда, когда мы точно знаем, что больше никакой поток не пытается вернуть данный узел, а это означает, что узел уже удален из стека. Следовательно, передача возвращаемого значения по ссылке больше не является преимуществом, с тем же успехом можно было бы вернуть его и по значению. Чтобы безопасно вернуть значение, придется воспользоваться другим вариантом, описанным в главе 3: возвращать интеллектуальный указатель на данные.

Возврат nullptr в качестве значения интеллектуального указателя будет означать, что данных в стеке нет, но беда в том, что теперь приходится выделять память из кучи. Если делать это в pop(), то получится, что мы ровным счетом ничего не выиграли, потому что выделение памяти может возбудить исключение. Вместо этого мы будем выделять память в push(), при помещении данных в стек — память-то для структуры node выделять приходится в любом случае. Возврат std::shared_ptr<> не возбуждает исключений, поэтому pop() теперь безопасна. Собрав все вместе, мы получим код, показанный в следующем листинге.

Листинг 7.3. Свободный от блокировок стек с утечкой узлов

template<typename T>

class lock_free_stack {

private:

 struct node               (1) Теперь данные

 {                         │ удерживаются

  std::shared_ptr<T> data;←┘ указателем

  node* next;

  node(T const& data_) :          (2) Создаем std::shared_ptr

   data(std::make_shared<T>(data))←┤ Для только что выде-

   {}                              │ ленного T

 };

 std::atomic<node*> head;

public:

 void push(T const& data) {

  node* const new_node = new node(data);

  new_node->next = head.load();

  while (!head.compare_exchange_weak(new_node->next, new_node));

 }

 std::shared_ptr<T> pop()

 {                                                        (3) Перед разыменованием

  node* old_head = head.load();│проверяем, что old_head

  while (old_head &&          ←┘ ненулевой указатель

   !head.compare_exchange_weak(old_head, old_head->next));

  return old_head ? old_head->data : std::shared_ptr<T>();← (4)

 }

};

Теперь данные удерживаются указателем (1), поэтому мы должны выделять память для них из кучи в конструкторе узле (2). Кроме того, перед тем как разыменовывать old_head в цикле compare_exchange_weak() (3), следует проверять указатель на нуль. Наконец, мы либо возвращаем ассоциированные с узлом данные, если узел имеется, либо нулевой указатель, если его нет (4). Отметим, что этот алгоритм свободен от блокировок, но не свободен от ожидания, потому что цикл while в push() и pop() теоретически может выполняться бесконечно, если compare_exchange_weak() будет каждый раз возвращать false.

Если бы у нас был сборщик мусора (как в управляемых языках типа С# или Java), то на этом можно было бы ставить точку — старый узел был бы убран и повторно использован после того, как все потоки перестанут к нему обращаться. Но сегодня мало найдётся компиляторов С++ с встроенным сборщиком мусора, поэтому прибираться за собой нужно самостоятельно.