7.2.4. Нахождение используемых узлов с помощью подсчета ссылок
В разделе 7.2.2 мы видели, что проблема удаления узлов сводится к задаче нахождения узлов, к которым еще обращаются потоки-читатели. Если бы можно было точно узнать, на какие узлы есть ссылки и когда количество ссылок обращается в нуль, то узлы можно было бы удалить. Указатели опасности решают эту проблему путем хранения списка используемых узлов, а механизм подсчета ссылок — путем хранения числа потоков, обращающихся к каждому узлу.
Выглядит просто и элегантно, но реализовать на практике очень трудно. Сразу приходит в голову мысль, что для такой задачи подошло бы что-то вроде std::shared_ptr<> — ведь это и есть указатель с подсчетом ссылок. Увы, хотя некоторые операции над std::shared_ptr<> атомарны, не гарантируется, что они свободны от блокировок. Сам по себе класс std::shared_ptr<> ничем не хуже прочих с точки зрения операций над атомарными типами, но он рассчитан на применение в самых разных контекстах, и попытка сделать атомарные операции над ним свободными от блокировок, скорее всего, привела бы к увеличению накладных расходов при любом его использовании. Если на вашей платформе функция std::atomic_is_lock_free(&some_shared_ptr) возвращает true, то проблему освобождения памяти можно считать полностью решенной. Достаточно хранить в списке объекты std::shared_ptr<node>, как показано в следующем листинге.
Листинг 7.9. Свободный от блокировок стек на основе свободной от блокировок реализации std::shared_ptr<>
template<typename T>
class lock_free_stack {
private:
struct node {
std::shared_ptr<T> data;
std::shared_ptr<node> next;
node(T const& data_) :
data(std::make_shared<T>(data_)) {}
};
std::shared_ptr<node> head;
public:
void push(T const& data) {
std::shared_ptr<node> const new_node =
std::make_shared<node>(data);
new_node->next = head.load();
while (!std::atomic_compare_exchange_weak(
&head, &new_node->next, new_node));
}
std::shared_ptr<T> pop() {
std::shared_ptr<node> old_head = std::atomic_load(&head);
while(old_head && !std::atomic_compare_exchange_weak(
&head, &old_head, old_head->next));
return old_head ? old_head->data : std::shared_ptr<T>();
}
};
В том весьма вероятном случае, когда реализация std::shared_ptr<> не свободна от блокировок, управлять подсчетом ссылок придется самостоятельно.
В одном из возможных способов используется не один, а два счетчика ссылок — внутренний и внешний. Их сумма равна общему количеству ссылок на узел. Внешний счетчик хранится вместе с указателем на узел и увеличивается при каждом чтении этого указателя. Когда читатель закапчивает работу с узлом, он уменьшает его внутренний счетчик. Таким образом, по завершении простой операции чтения указателя внешний счетчик увеличится на единицу, а внутренний уменьшится на единицу.
Когда необходимость в связи между внешним счетчиком и указателем отпадает (то есть узел невозможно получить из области памяти, доступной другим потокам), внутренний счетчик увеличивается на величину внешнего минус 1, а внешний отбрасывается. Если внутренний счетчик обратился в нуль, значит, никаких ссылок на узел извне не осталось и его можно удалять. Для обновления разделяемых данных по-прежнему необходимо применять атомарные операции. Теперь рассмотрим реализацию свободного от блокировок стека, в которой эта техника используется для гарантии того, что узлы освобождаются, только когда это безопасно.
В листинге ниже показала внутренняя структура данных и реализация функции push() — простая и элегантная.
Листинг 7.10. Помещение узла в свободный от блокировок стек с разделённым счётчиком ссылок
template<typename T>
class lock_free_stack {
private:
struct node;
struct counted_node_ptr {← (1)
int external_count;
node* ptr;
};
struct node {
std::shared_ptr<T> data;← (2)
std::atomic<int> internal_count;
counted_node_ptr next; ← (3)
node(T const& data_) :
data(std::make_shared<T>(data_)), internal_count(0) {}
};
std::atomic<counted_node_ptr> head;← (4)
public:
~lock_free_stack() {
while(pop());
}
void push(T const& data) {← (5)
counted_node_ptr new_node;
new_node.ptr = new node(data);
new_node.external_count = 1;
new_node.ptr->next = head.load();
while(
!head.compare_exchange_weak(new_node.ptr->next, new_node));
}
};
Обратите внимание, что внешний счетчик хранится вместе с указателем на узел в структуре counted_node_ptr (1). Эта структура затем используется для представления указателя next в структуре node (3), где хранится также внешний счетчик (2). Поскольку counted_node_ptr определена как struct, то ее можно использовать в шаблоне std::atomic<> для представления головы списка head (4).
На платформах, где поддерживается операция сравнения и обмена двойного слова, размер этой структуры достаточно мал для того, чтобы тип данных std::atomic<counted_node_ptr> был свободен от блокировок. Если вы работаете на другой платформе, то лучше пользоваться вариантом std::shared_ptr<>, приведенным в листинге 7.9, потому что в std::atomic<> для гарантирования атомарности используется мьютекс, когда тип слишком велик и с помощью машинных команд обеспечить атомарность невозможно (а, следовательно, ваш алгоритм, якобы «свободный от блокировок», на самом теле таковым не является). Можно поступить и по-другому — если вы готовы ограничить размер счетчика и знаете, что на данной платформе в указателе есть неиспользуемые биты (например, потому что адресное пространство представлено 48 битами, а под указатель отводится 64 бита), то счетчик можно хранить в незанятых битах указателя, и тогда оба поля поместятся в одно машинное слово. Но для таких трюков нужно хорошо знать особенности платформы, так что в этой книге мы их обсуждать не будем.
Функция push() относительно проста (5). Мы конструируем объект counted_node_ptr, ссылающийся на только что выделенный узел node с ассоциированными данными, и в поле next узла node записываем текущее значение head. Затем с помощью compare_exchange_weak() устанавливаем новое значение head, как и раньше. Внутренний счетчик internal_count инициализируется нулем, а внешний external_count — единицей. Поскольку узел новый, на него пока существует только одна внешняя ссылка (из самого указателя head).
Как обычно, все сложности сосредоточены в реализации pop(), показанной в следующем листинге.
Листинг 7.11. Выталкивание узла из свободного от блокировок стека с разделённым счётчиком ссылок
template<typename T>
class lock_free_stack {
private:
void increase_head_count(counted_node_ptr& old_counter) {
counted_node_ptr new_counter;
do {
new_counter = old_counter;
++new_counter.external_count;
}
while (
!head.compare_exchange_strong(old_counter, new_counter));← (1)
old_counter.external_count = new_counter.external_count;
}
public:
std::shared_ptr<T> pop() {
counted_node_ptr old_head = head.load();
for (;;) {
increase_head_count(old_head);
node* const ptr = old_head.ptr;← (2)
if (!ptr) {
return std::shared_ptr<T>();
}
if (head.compare_exchange_strong(old_head, ptr->next)) {← (3)
std::shared_ptr<T> res;
res.swap(ptr->data); ← (4)
int const count_increase = old_head.external_count - 2;← (5)
if (ptr->internal_count.fetch_add(count_increase) ==
-count_increase) { ← (6)
delete ptr;
}
return res; ← (7)
} else if (ptr->internal_count.fetch_sub(1) == 1) {
delete ptr; ← (8)
}
}
}
};
Теперь, загрузив значение head, мы должны сначала увеличить счетчик внешних ссылок на узел head, показав, что ссылаемся на него, — только тогда его можно безопасно будет разыменовывать. Если попытаться разыменовать указатель до увеличения счетчика ссылок, то вполне может случиться так, что другой поток освободит узел раньше, чем мы успеем обратиться к нему, и, стало быть, оставит нам висячий указатель. Именно в этом главная причина использования разделенного счетчика ссылок: увеличивая внешний счетчик ссылок, мы гарантируем, что указатель останется действительным в течение всего времени работы с ним. Увеличение производится в цикле по compare_exchange_strong() (1), где устанавливаются все поля структуры, чтобы быть уверенным, что другой поток не изменил в промежутке указатель.
Увеличив счетчик, мы можем без опаски разыменовать поле ptr объекта, загруженного из head, и получить тем самым доступ к адресуемому узлу (2). Если оказалось, что указатель нулевой, то мы находимся в конце списка — больше записей нет. В противном случае мы можем попытаться исключить узел из списка, выполнив compare_exchange_strong() с головным узлом head (3).
Если compare_exchange_strong() возвращает true, то мы приняли на себя владение узлом и можем с помощью функции swap() вытащить из него данные, которые впоследствии вернём (4). Тем самым гарантируется, что данные случайно не изменятся, если вдруг другие обращающиеся к стеку другие потоки удерживают указатели на этот узел. Затем можно прибавить внешний счетчик к внутреннему с помощью атомарной операции fetch_add (6). Если теперь счетчик ссылок стал равен нулю, то предыдущее значение (то, которое возвращает fetch_add) было противоположно только что прибавленному, и тогда узел можно удалять. Важно отметить, что прибавленное значение на самом деле на 2 меньше внешнего счетчика (5); мы исключили узел из списка, вследствие чего значение счетчика уменьшилось на 1, и больше не обращаемся к узлу из данного потока, что дает уменьшение еще на 1. Неважно, удаляется узел или нет, наша работа закончена, и мы можем вернуть данные (7).
Если сравнение с обменом (3) не проходит, значит, другой поток сумел удалить узел раньше нас, либо другой поток добавил в стек новый узел. В любом случае нужно начать с начала — с новым значением head, которое вернула функция compare_exchange_strong(). Но прежде необходимо уменьшить счетчик ссылок на узел, который мы пытались исключить раньше. Этот поток больше не будет к нему обращаться. Если наш поток — последний, удерживавший ссылку на этот узел (потому что другой поток вытолкнул его из стека), то внутренний счетчик ссылок равен 1, так что после вычитания 1 он обратится в нуль. В таком случае мы можем удалить узел прямо здесь, не дожидаясь перехода к следующей итерации цикла (8).
До сих мы задавали для всех атомарных операций упорядочение доступа к памяти std::memory_order_seq_cst. В большинстве систем это самый неэффективный режим с точки зрения времени выполнения и накладных расходов на синхронизацию, причем в ряде случаев разница весьма ощутима. Но теперь, определившись с логикой структуры данных, можно подумать и о том, чтобы ослабить некоторые требования к упорядочению, — все-таки не хотелось бы, чтобы пользователи стека несли лишние расходы. Итак, перед тем как расстаться со стеком и перейти к проектированию свободной от блокировок очереди, еще раз присмотримся к операциям стека и спросим себя, нельзя ли где-нибудь использовать более слабое упорядочение доступа, сохранив тот же уровень безопасности?