7.2.5. Применение модели памяти к свободному от блокировок стеку

Прежде чем менять схему упорядочения, нужно исследовать все операции и определить, какие между ними должны быть отношения. Затем можно будет вернуться и найти минимальное упорядочение, которое эти отношения обеспечивает. Чтобы это сделать, потребуется взглянуть на ситуацию с точки зрения потоков в нескольких разных сценариях. Простейший сценарий возникает, когда один поток помещает элемент данных в стек, а другой через некоторое время извлекает его оттуда, с него и начнем.

В этом сценарии есть три существенных участника. Во-первых, структура counted_node_ptr, используемая для передачи данных — узла head. Во-вторых, структура node, на которую head ссылается. И, в-третьих, сами данные, на которые указывает узел.

Поток, выполняющий push(), сначала конструирует элемент данных и объект node, затем устанавливает head. Поток, выполняющий pop(), сначала загружает значение head, затем в цикле сравнения с обменом увеличивает хранящийся в нем счетчик ссылок, после чего читает структуру node, чтобы извлечь из нее значение next. Из этой последовательности можно вывести требуемое отношение; значение next — простой неатомарный объект, поэтому для его безопасного чтения должно существовать отношение происходит-раньше между операциями сохранения (в заталкивающем потоке) и загрузки (в выталкивающем потоке). Поскольку в push() имеется единственная атомарная операция — compare_exchange_weak(), а для существования отношения происходит-раньше между потоками нам нужна операция освобождения (release), то для функции compare_exchange_weak() необходимо задать упорядочение std::memory_order_release или более сильное. Если compare_exchange_weak() вернула false, то ничего не было изменено, и мы можем продолжить цикл, следовательно в этом случае нужна только семантика std::memory_order_relaxed:

void push(T const& data) {

 counted_node_ptr new_node;

 new_node.ptr = new node(data);

 new_node.external_count = 1;

 new_node.ptr->next = head.load(std::memory_order_relaxed);

 while (!head.compare_exchange_weak(

  new_node.ptr->next, new_node,

  std::memory_order_release, std::memory_order_relaxed));

}

А что можно сказать о коде pop()? Чтобы получить желаемое отношение происходит-раньше, перед доступом к next необходима операция с семантикой std::memory_order_acquire или более сильной. Указатель, который разыменовывается для доступа к полю next, — это прежнее значение, прочитанное операцией compare_exchange_strong() в increase_head_count(), поэтому указанная семантика нужна в случае успеха. Как и в push(), если обмен закончился неудачно, мы просто повторяем цикл, поэтому для отказа можно задать ослабленное упорядочение:

void increase_head_count(counted_node_ptr& old_counter) {

 counted_node_ptr new_counter;

 do {

  new_counter = old_counter;

  ++new_counter.external_count;

 }

 while (!head.compare_exchange_strong(

  old_counter, new_counter,

  std::memory_order_acquire, std::memory_order_relaxed));

 old_counter.external_count = new_counter.external_count;

}

Если вызов compare_exchange_strong() завершается успешно, то мы знаем, что раньше в поле ptr прочитанного значения находилось то, что теперь хранится в переменной old_counter. Поскольку сохранение в push() было операцией освобождения, а данный вызов compare_exchange_strong() — операция захвата, то сохранение синхронизируется-с загрузкой, и мы имеем отношение происходит-раньше. Следовательно, сохранение в поле ptr в push() происходит-раньше доступа к ptr->next в pop(), и мы в безопасности.

Отметим, что для этого анализа порядок доступа к памяти в начальном вызове head.load() не имел значения, поэтому в нем безопасно задать семантику std::memory_order_relaxed.

Далее на очереди операция compare_exchange_strong(), которая записывает в head значение old_head.ptr->next. Нужно ли наложить на нее какие-нибудь ограничения, чтобы гарантировать целостность данных в этом потоке? Если обмен завершается успешно, то мы обращаемся к ptr->data, поэтому должны быть уверены, что сохранение ptr->data в потоке, выполняющем push(), происходит-раньше загрузки. Но такая уверенность уже есть: операция захвата в increase_head_count() гарантирует, что существует отношение синхронизируется-с между сохранением в потоке, выполняющем push(), и операцией сравнения с обменом. Поскольку сохранение данных в потоке, выполняющем push(), расположено перед сохранением head, а вызов increase_head_count() расположен перед загрузкой ptr->data, то отношение происходит-раньше имеет место, и всё будет хорошо, даже если для операции сравнения с обменом в pop() задана семантика std::memory_order_relaxed. Есть еще всего одно место, где изменяется ptr->data — тот самый вызов swap(), на который вы сейчас смотрите, и ни один другой поток не может оперировать тем же узлом — в этом и заключается смысл сравнения с обменом.

Если compare_exchange_strong() завершается неудачно, то к новому значению old_head не будет обращений до следующей итерации цикла, и, поскольку мы уже решили, что семантики std::memory_order_acquire хватало в increase_head_count(), то здесь будет достаточно std::memory_order_relaxed.

Что можно сказать насчёт других потоков? Нужны ли более сильные ограничения, чтобы и другие потоки работали безопасно? Нет, не нужны, потому что head модифицируется только операциями сравнения с обменом. Будучи операциями чтения-модификации-записи, они составляют часть последовательности освобождений, начатой операцией сравнения с обменом в push(). Поэтому compare_exchange_weak() в push() синхронизируется-с операцией compare_exchange_strong() в increase_head_count(), которая прочитает сохраненное значение, даже если в промежутке другие потоки изменят head.

Мы почти закончили, осталось только рассмотреть функции, в которых используются операции fetch_add(), изменяющие счетчик ссылок. Поток, который добрался до возврата данных из узла, может продолжать в твердой уверенности, что никакой другой поток не сможет модифицировать хранящиеся в узле данные. Однако любой поток, который потерпел неудачу при извлечении данных, знает, что какой-то другой поток данные в узле модифицировал; он использовал функцию swap() для извлечения данных. Следовательно, чтобы предотвратить гонку за данными мы должны гарантировать, что swap() происходит-раньше delete. Чтобы добиться этого, проще всего задать семантику std::memory_order_release при вызове fetch_add() в ветви, где мы возвращаем данные, и семантику std::memory_order_acquire — в ветви, где мы возвращаемся в начало цикла. Однако даже это перебор — лишь один поток выполняет delete (тот, что сбросил счетчик в нуль), поэтому только этому потоку нужно выполнить операцию захвата. К счастью, поскольку fetch_add() — операция чтения-модификации-записи, то она составляет часть последовательности освобождений, поэтому для достижения цели нам достаточно дополнительной операции load(). Если в ветви, где происходит возврат в начало цикла, счетчик ссылок уменьшается до нуля, то здесь же можно перезагрузить счетчик ссылок с семантикой std::memory_order_acquire, чтобы обеспечить требуемое отношение синхронизируется-с, а в самой операции fetch_add() достаточно задать std::memory_order_relaxed. Окончательная реализация стека с новой версией pop() приведена ниже.

Листинг 7.12. Свободный от блокировок стек с подсчётом ссылок и ослабленными атомарными операциями

template<typename T>

class lock_free_stack {

private:

 struct node;

 struct counted_node_ptr {

  int external_count;

  node* ptr;

 };

 struct node {

  std::shared_ptr<T> data;

  std::atomic<int> internal_count;

  counted_node_ptr next;

  node(T const& data_):

  data(std::make_shared<T>(data_)), internal_count(0) {}

 };

 std::atomic<counted_node_ptr> head;

 void increase_head_count(counted_node_ptr& old_counter) {

  counted_node_ptr new_counter;

  do {

   new_counter = old_counter;

   ++new_counter.external_count;

  }

  while (!head.compare_exchange_strong(old_counter, new_counter,

         std::memory_order_acquire,

         std::memory_order_relaxed));

  old_counter.external_count = new_counter.external_count;

 }

public:

 ~lock_free_stack() {

  while(pop());

 }

 void push(T const& data) {

  counted_node_ptr new_node;

  new_node.ptr = new node(data);

  new_node.external_count = 1;

  new_node.ptr->next = head.load(std::memory_order_relaxed);

  while (!head.compare_exchange_weak(

           new_node.ptr->next, new_node,

           std::memory_order_release,

           std::memory_order_relaxed));

 }

 std::shared_ptr<T> pop() {

  counted_node_ptr old_head =

   head.load(std::memory_order_relaxed);

  for (;;) {

   increase_head_count(old_head);

   node* const ptr = old_head.ptr;

   if (!ptr) {

    return std::shared_ptr<T>();

   }

   if (head.compare_exchange_strong(old_head, ptr->next,

       std::memory_order_relaxed)) {

    std::shared_ptr<T> res;

    res.swap(ptr->data);

    int const count_increase = old_head.external_count — 2;

    if (ptr->internal_count.fetch_add(count_increase,

        std::memory_order_release) == -count_increase) {

     delete ptr;

    }

    return res;

   }

   else if (ptr->internal_count.fetch_add(-1,

            std::memory_order_relaxed) == 1) {

    ptr->internal_count.load(std::memory_order_acquire);

    delete ptr;

   }

  }

 }

};

Мы немало потрудились, но наконец-то дошли до конца, и стек теперь стал куда лучше. За счет тщательно продуманного применения ослабленных операций нам удалось повысить производительность, не жертвуя корректностью. Как видите, реализация pop() теперь насчитывает 37 строк вместо 8 в эквивалентной реализации pop() для стека с блокировками (листинг 7.1) и 7 строк для простого свободного от блокировок стека без управления памятью (листинг 7.2). При рассмотрении свободной от блокировок очереди мы встретимся с аналогичной ситуацией: сложность кода в значительной степени обусловлена именно управлением памятью.