7.2.4. Нахождение используемых узлов с помощью подсчета ссылок

В разделе 7.2.2 мы видели, что проблема удаления узлов сводится к задаче нахождения узлов, к которым еще обращаются потоки-читатели. Если бы можно было точно узнать, на какие узлы есть ссылки и когда количество ссылок обращается в нуль, то узлы можно было бы удалить. Указатели опасности решают эту проблему путем хранения списка используемых узлов, а механизм подсчета ссылок — путем хранения числа потоков, обращающихся к каждому узлу.

Выглядит просто и элегантно, но реализовать на практике очень трудно. Сразу приходит в голову мысль, что для такой задачи подошло бы что-то вроде std::shared_ptr<> — ведь это и есть указатель с подсчетом ссылок. Увы, хотя некоторые операции над std::shared_ptr<> атомарны, не гарантируется, что они свободны от блокировок. Сам по себе класс std::shared_ptr<> ничем не хуже прочих с точки зрения операций над атомарными типами, но он рассчитан на применение в самых разных контекстах, и попытка сделать атомарные операции над ним свободными от блокировок, скорее всего, привела бы к увеличению накладных расходов при любом его использовании. Если на вашей платформе функция std::atomic_is_lock_free(&some_shared_ptr) возвращает true, то проблему освобождения памяти можно считать полностью решенной. Достаточно хранить в списке объекты std::shared_ptr<node>, как показано в следующем листинге.

Листинг 7.9. Свободный от блокировок стек на основе свободной от блокировок реализации std::shared_ptr<>

template<typename T>

class lock_free_stack {

private:

 struct node {

  std::shared_ptr<T> data;

  std::shared_ptr<node> next;

  node(T const& data_) :

   data(std::make_shared<T>(data_)) {}

 };

 std::shared_ptr<node> head;

public:

 void push(T const& data) {

  std::shared_ptr<node> const new_node =

   std::make_shared<node>(data);

  new_node->next = head.load();

  while (!std::atomic_compare_exchange_weak(

   &head, &new_node->next, new_node));

 }

 std::shared_ptr<T> pop() {

  std::shared_ptr<node> old_head = std::atomic_load(&head);

  while(old_head && !std::atomic_compare_exchange_weak(

   &head, &old_head, old_head->next));

  return old_head ? old_head->data : std::shared_ptr<T>();

 }

};

В том весьма вероятном случае, когда реализация std::shared_ptr<> не свободна от блокировок, управлять подсчетом ссылок придется самостоятельно.

В одном из возможных способов используется не один, а два счетчика ссылок — внутренний и внешний. Их сумма равна общему количеству ссылок на узел. Внешний счетчик хранится вместе с указателем на узел и увеличивается при каждом чтении этого указателя. Когда читатель закапчивает работу с узлом, он уменьшает его внутренний счетчик. Таким образом, по завершении простой операции чтения указателя внешний счетчик увеличится на единицу, а внутренний уменьшится на единицу.

Когда необходимость в связи между внешним счетчиком и указателем отпадает (то есть узел невозможно получить из области памяти, доступной другим потокам), внутренний счетчик увеличивается на величину внешнего минус 1, а внешний отбрасывается. Если внутренний счетчик обратился в нуль, значит, никаких ссылок на узел извне не осталось и его можно удалять. Для обновления разделяемых данных по-прежнему необходимо применять атомарные операции. Теперь рассмотрим реализацию свободного от блокировок стека, в которой эта техника используется для гарантии того, что узлы освобождаются, только когда это безопасно.

В листинге ниже показала внутренняя структура данных и реализация функции push() — простая и элегантная.

Листинг 7.10. Помещение узла в свободный от блокировок стек с разделённым счётчиком ссылок

template<typename T>

class lock_free_stack {

private:

 struct node;

 struct counted_node_ptr {← (1)

  int external_count;

  node* ptr;

 };

 struct node {

  std::shared_ptr<T> data;← (2)

  std::atomic<int> internal_count;

  counted_node_ptr next;  ← (3)

  node(T const& data_) :

   data(std::make_shared<T>(data_)), internal_count(0) {}

 };

 std::atomic<counted_node_ptr> head;← (4)

public:

 ~lock_free_stack() {

  while(pop());

 }

 void push(T const& data) {← (5)

  counted_node_ptr new_node;

  new_node.ptr = new node(data);

  new_node.external_count = 1;

  new_node.ptr->next = head.load();

  while(

   !head.compare_exchange_weak(new_node.ptr->next, new_node));

 }

};

Обратите внимание, что внешний счетчик хранится вместе с указателем на узел в структуре counted_node_ptr (1). Эта структура затем используется для представления указателя next в структуре node (3), где хранится также внешний счетчик (2). Поскольку counted_node_ptr определена как struct, то ее можно использовать в шаблоне std::atomic<> для представления головы списка head (4).

На платформах, где поддерживается операция сравнения и обмена двойного слова, размер этой структуры достаточно мал для того, чтобы тип данных std::atomic<counted_node_ptr> был свободен от блокировок. Если вы работаете на другой платформе, то лучше пользоваться вариантом std::shared_ptr<>, приведенным в листинге 7.9, потому что в std::atomic<> для гарантирования атомарности используется мьютекс, когда тип слишком велик и с помощью машинных команд обеспечить атомарность невозможно (а, следовательно, ваш алгоритм, якобы «свободный от блокировок», на самом теле таковым не является). Можно поступить и по-другому — если вы готовы ограничить размер счетчика и знаете, что на данной платформе в указателе есть неиспользуемые биты (например, потому что адресное пространство представлено 48 битами, а под указатель отводится 64 бита), то счетчик можно хранить в незанятых битах указателя, и тогда оба поля поместятся в одно машинное слово. Но для таких трюков нужно хорошо знать особенности платформы, так что в этой книге мы их обсуждать не будем.

Функция push() относительно проста (5). Мы конструируем объект counted_node_ptr, ссылающийся на только что выделенный узел node с ассоциированными данными, и в поле next узла node записываем текущее значение head. Затем с помощью compare_exchange_weak() устанавливаем новое значение head, как и раньше. Внутренний счетчик internal_count инициализируется нулем, а внешний external_count — единицей. Поскольку узел новый, на него пока существует только одна внешняя ссылка (из самого указателя head).

Как обычно, все сложности сосредоточены в реализации pop(), показанной в следующем листинге.

Листинг 7.11. Выталкивание узла из свободного от блокировок стека с разделённым счётчиком ссылок

template<typename T>

class lock_free_stack {

private:

 void increase_head_count(counted_node_ptr& old_counter) {

  counted_node_ptr new_counter;

  do {

   new_counter = old_counter;

   ++new_counter.external_count;

  }

  while (

   !head.compare_exchange_strong(old_counter, new_counter));← (1)

  old_counter.external_count = new_counter.external_count;

 }

public:

 std::shared_ptr<T> pop() {

  counted_node_ptr old_head = head.load();

  for (;;) {

   increase_head_count(old_head);

   node* const ptr = old_head.ptr;← (2)

   if (!ptr) {

    return std::shared_ptr<T>();

   }

   if (head.compare_exchange_strong(old_head, ptr->next)) {← (3)

    std::shared_ptr<T> res;

    res.swap(ptr->data);                                   ← (4)

    int const count_increase = old_head.external_count - 2;← (5)

    if (ptr->internal_count.fetch_add(count_increase) ==

        -count_increase) {                                 ← (6)

     delete ptr;

    }

    return res; ← (7)

   } else if (ptr->internal_count.fetch_sub(1) == 1) {

    delete ptr; ← (8)

   }

  }

 }

};

Теперь, загрузив значение head, мы должны сначала увеличить счетчик внешних ссылок на узел head, показав, что ссылаемся на него, — только тогда его можно безопасно будет разыменовывать. Если попытаться разыменовать указатель до увеличения счетчика ссылок, то вполне может случиться так, что другой поток освободит узел раньше, чем мы успеем обратиться к нему, и, стало быть, оставит нам висячий указатель. Именно в этом главная причина использования разделенного счетчика ссылок: увеличивая внешний счетчик ссылок, мы гарантируем, что указатель останется действительным в течение всего времени работы с ним. Увеличение производится в цикле по compare_exchange_strong() (1), где устанавливаются все поля структуры, чтобы быть уверенным, что другой поток не изменил в промежутке указатель.

Увеличив счетчик, мы можем без опаски разыменовать поле ptr объекта, загруженного из head, и получить тем самым доступ к адресуемому узлу (2). Если оказалось, что указатель нулевой, то мы находимся в конце списка — больше записей нет. В противном случае мы можем попытаться исключить узел из списка, выполнив compare_exchange_strong() с головным узлом head (3).

Если compare_exchange_strong() возвращает true, то мы приняли на себя владение узлом и можем с помощью функции swap() вытащить из него данные, которые впоследствии вернём (4). Тем самым гарантируется, что данные случайно не изменятся, если вдруг другие обращающиеся к стеку другие потоки удерживают указатели на этот узел. Затем можно прибавить внешний счетчик к внутреннему с помощью атомарной операции fetch_add (6). Если теперь счетчик ссылок стал равен нулю, то предыдущее значение (то, которое возвращает fetch_add) было противоположно только что прибавленному, и тогда узел можно удалять. Важно отметить, что прибавленное значение на самом деле на 2 меньше внешнего счетчика (5); мы исключили узел из списка, вследствие чего значение счетчика уменьшилось на 1, и больше не обращаемся к узлу из данного потока, что дает уменьшение еще на 1. Неважно, удаляется узел или нет, наша работа закончена, и мы можем вернуть данные (7).

Если сравнение с обменом (3) не проходит, значит, другой поток сумел удалить узел раньше нас, либо другой поток добавил в стек новый узел. В любом случае нужно начать с начала — с новым значением head, которое вернула функция compare_exchange_strong(). Но прежде необходимо уменьшить счетчик ссылок на узел, который мы пытались исключить раньше. Этот поток больше не будет к нему обращаться. Если наш поток — последний, удерживавший ссылку на этот узел (потому что другой поток вытолкнул его из стека), то внутренний счетчик ссылок равен 1, так что после вычитания 1 он обратится в нуль. В таком случае мы можем удалить узел прямо здесь, не дожидаясь перехода к следующей итерации цикла (8).

До сих мы задавали для всех атомарных операций упорядочение доступа к памяти std::memory_order_seq_cst. В большинстве систем это самый неэффективный режим с точки зрения времени выполнения и накладных расходов на синхронизацию, причем в ряде случаев разница весьма ощутима. Но теперь, определившись с логикой структуры данных, можно подумать и о том, чтобы ослабить некоторые требования к упорядочению, — все-таки не хотелось бы, чтобы пользователи стека несли лишние расходы. Итак, перед тем как расстаться со стеком и перейти к проектированию свободной от блокировок очереди, еще раз присмотримся к операциям стека и спросим себя, нельзя ли где-нибудь использовать более слабое упорядочение доступа, сохранив тот же уровень безопасности?