7.2.3. Обнаружение узлов, не подлежащих освобождению, с помощью указателей опасности

Термин указатели опасности откосится к технике, предложенной Магедом Майклом (Maged Michael)[12]. Они называются так потому, что удаление узла, на который все еще могут ссылаться другие потоки, — опасное предприятие. Если действительно существует поток, ссылающийся на данный узел, и этот поток попытается обратиться к нему по ссылке, то мы получим неопределенное поведение. Основная идея заключается в том, что поток, собирающийся получить доступ к объекту, который другой поток может захотеть удалить, сначала устанавливает указатель опасности, ссылающийся на этот объект, информируя тем самым другой поток, что удалять этот объект действительно опасно. После того как объект перестает быть нужным, указатель опасности очищается. Если вам доводилось наблюдать гребную регату между командами Оксфорда и Кембриджа, то вы могли видеть аналогичный механизм, применяемый в начале заезда: рулевой в лодке может поднять руку, сообщая, что экипаж еще не готов. Пока хотя бы один рулевой держит руку поднятой, судья не может дать старт заезду. После того как оба рулевых опустят руки, судья может давать старт, однако рулевой вправе снова поднять руку, если заезд еще не начался, а ситуация, на его взгляд, изменилась.

Собираясь удалить объект, поток должен сначала проверить указатели опасности в других имеющихся в системе потоках. Если ни один из указателей опасности не ссылается на данный объект, то его можно спокойно удалять. В противном случае удаление следует отложить. Периодически поток просматривает список отложенных объектов в поисках тех, которые уже можно удалить.

Высокоуровневое описание выглядит достаточно простым, но как это сделать на С++?

Прежде всего, необходимо место для хранения указателя на интересующий нас объект — сам указатель опасности. Это место должно быть видно из всех потоков, причем указатель опасности должен существовать в каждом потоке, который может получить доступ к структуре данных. Корректное и эффективное выделение такого места — непростая задача, поэтому отложим ее на потом, а пока предположим, что существует функция get_hazard_pointer_for_current_thread(), которая возвращает ссылку на указатель опасности. Затем нужно установить указатель опасности перед чтением указателя, который мы намерены разыменовать, — в данном случае указателя head на начало списка:

std::shared_ptr<T> pop() {

 std::atomic<void*>& hp =

  get_hazard_pointer_for_current_thread();

 node* old_head = head.load();← (1)

 node* temp;

 do {

  temp = old_head;

  hp.store(old_head);         ← (2)

  old_head = head.load();

 } while (old_head != temp);  ← (3)

 // ...

}

Это необходимо делать в цикле while, чтобы узел node случайно не был удалён между чтением старого указателя head (1) и установкой указателя опасности (2). В течение этого промежутка времени ни один поток не знает, что мы собираемся обратиться к этому узлу. К счастью, если кто-то собирается удалить старый узел head, то сам указатель head должен был быть изменен, так что мы можем это проверить и не выходить из цикла, пока не будем твердо уверены, что указатель head по-прежнему имеет то же значение, которое было записано в указатель опасности (3). Такое использование указателей опасности опирается на тот факт, что можно безопасно использовать значение указателя даже после того, как объект, на который он указывает, уже удалён. Технически для стандартных реализаций new и delete это считается неопределенным поведением, поэтому либо убедитесь, что ваша реализация стандартной библиотеки допускает такое использование, либо реализуйте собственный распределитель.

Установив указатель опасности, мы можем продолжить выполнение pop(), будучи уверены, что ни один другой поток не попытается «вытащить» из-под нас узлы. Ну почти уверены: при каждом перечитывании old_head необходимо обновлять указатель опасности перед тем, как разыменовывать вновь прочитанное значение указателя. После того как узел извлечён из списка, мы можем очистить наш собственный указатель опасности. Если на наш узел не ссылаются другие указатели опасности, то его можно удалять; в противном случае его следует поместить в список узлов, ожидающих удаления. В листинге ниже приведен полный код функции pop(), реализованной по такой схеме.

Листинг 7.6. Реализация функции pop() с помощью указателей опасности

std::shared_ptr<T> pop() {

 std::atomic<void*>& hp =

  get_hazard_pointer_for_current_thread();

 node* old_head = head.load();

 do {

  node* temp; (1) Цикл, пока указатель

  do         ←┤ опасности не установлен

  {           │ на head

   temp = old_head;

   hp.store(old_head);

   old_head = head.load();

  } while (old_head != temp);

 }

 while (old_head &&

  !head.compare_exchange_strong(old_head, old_head->next))

  hp.store(nullptr);← (2) Закончив, очищаем указатель опасности

 std::shared_ptr<T> res;

 if (old_head) {

  res.swap(old_head->data);

  if (outstanding_hazard_pointers_for(old_head))←┐ Прежде чем

  {                                              ├ (3) удалять узел,

   reclaim_later(old_head);                      │ проверяем,(4)

  }                                              │ нет ли ссы-

  else                                           │ лающихся на

  {                                              │ него указате-

                                                 │ лей опасности

   delete old_head; ← (5)

  }

  delete_nodes_with_no_hazards();← (6)

 }

 return res;

}

Начнём с того, что мы перенесли цикл, в котором устанавливается указатель опасности, во внешний цикл, где перечитывается old_head, если операция сравнения с обменом завершается неудачно (1). Здесь мы используем функцию compare_exchange_strong(), потому что фактическая работа делается внутри цикла while: ложный отказ в compare_exchange_weak() привел бы к ненужному сбросу указателя опасности. Таким образом, гарантируется, что указатель опасности установлен перед разыменованием old_head. Заявив свои права на узел, мы можем очистить указатель опасности (2). Получив узел в свое распоряжение, мы должны проверить, не ссылаются ли на него указатели опасности, принадлежащие другим потокам (3). Если это так, то удалять узел пока нельзя, а нужно поместить его в список ожидающих (4); в противном случае узел можно удалять немедленно (5). Наконец, мы добавили вызов функции, в которой проверяется, существуют ли узлы, для которых мы ранее вызывали reclaim_later(). Если не осталось указателей опасности, ссылающихся на эти узлы, то мы можем спокойно удалить их (6). Те же узлы, на которые еще ссылается хотя бы один указатель опасности, остаются в списке и будут проверены следующим потоком, вызвавшим pop().

Разумеется, в новых функциях — get_hazard_pointer_for_current_thread(), reclaim_later(), outstanding_hazard_pointers_for() и delete_nodes_with_no_hazards() — скрыта масса деталей, поэтому отдёрнем занавес и посмотрим, как они работают.

Как именно в функции get_hazard_pointer_for_current_thread() выделяется память для принадлежащих потокам указателей опасности, несущественно для логики программы (хотя, как будет показано ниже, может влиять на эффективность). Поэтому пока ограничимся простой структурой: массивом фиксированного размера, в котором хранятся пары (идентификатор потока, указатель). Функция get_hazard_pointer_for_current_thread() ищет в этом массиве первую свободную позицию и записывает в поле ID идентификатор текущего потока. Когда поток завершается, эта позиция освобождается — в поле ID заносится сконструированное по умолчанию значение std::thread::id(). Этот алгоритм показан в следующем листинге.

Листинг 7.7. Простая реализация функции get_hazard_pointer_for_current_thread

unsigned const max_hazard_pointers = 100;

struct hazard_pointer {

 std::atomic<std::thread::id> id;

 std::atomic<void*> pointer;

};

hazard_pointer hazard_pointers[max_hazard_pointers];

class hp_owner {

 hazard_pointer* hp;

public:

 hp_owner(hp_owner const&) = delete;

 hp_owner operator=(hp_owner const&) = delete;

 hp_owner() :

  hp(nullptr) {

  for (unsigned i = 0; i < max_hazard_pointers; ++i) {

   std::thread::id old_id;

   if (

    hazard_pointers[i].

    id.compare_exchange_strong(         ←┐

     old_id, std::this_thread::get_id()))│ Пытаемся заявить

   {                                     │ права на владе-

    hp = &hazard_pointers[i];            │ ние указателем

    break;                               │ опасности

   }

  }

  if (!hp) {← (1)

   throw std::runtime_error("No hazard pointers available");

  }

 }

 std::atomic<void*>& get_pointer() {

  return hp->pointer;

 }

 ~hp_owner() {← (2)

  hp->pointer.store(nullptr);

  hp->id.store(std::thread::id());

 }

};

std::atomic<void*>& get_hazard_pointer_for_current_thread()← (3)

{                                    (4) У каждого потока

 thread_local static hp_owner hazard;←┘ свой указатель опасности

 return hazard.get_pointer();← (5)

}

Реализация самой функции get_hazard_pointer_for_current_thread() обманчиво проста (3): в ней объявлена переменная типа hp_owner в поточно-локальной памяти (4), в которой хранится принадлежащий данному потоку указатель опасности. Затем она просто возвращает полученный от этого объекта указатель (5). Работает это следующим образом: в первый раз, когда каждый поток вызывает эту функцию, создается новый экземпляр hp_owner. Его конструктор (1) ищет в таблице пар (владелец, указатель) незанятую запись (такую, у которой нет владельца). На каждой итерации цикла он с помощью compare_exchange_strong() атомарно выполняет два действия: проверяет, что у текущей записи нет владельца, и делает владельцем себя (2). Если compare_exchange_strong() возвращает false, значит, записью владеет другой поток, поэтому мы идем дальше. Если же функция вернула true, то мы успешно зарезервировали запись для текущего потока, поэтому можем сохранить ее адрес и прекратить поиск (3). Если мы дошли до конца списка и не обнаружили свободной записи (4), значит, потоков, использующих указатель опасности, слишком много, так что приходится возбуждать исключение.

После того как экземпляр hp_owner для данного потока создан, последующие обращения происходят гораздо быстрее, потому что указатель запомнен и просматривать таблицу снова нет нужды.

Когда завершается поток, для которого был создан объект hp_owner, этот объект уничтожается. Прежде чем сохранить в идентификаторе владельца значение std::thread::id(), деструктор записывает в сам указатель значение nullptr, чтобы другие потоки могли повторно использовать эту запись. При такой реализации get_hazard_pointer_for_current_thread() реализация функции outstanding_hazard_pointers_for() совсем проста: требуется только найти переданное значение в таблице указателей опасности:

bool outstanding_hazard_pointers_for(void* p) {

 for (unsigned i = 0; i < max_hazard_pointers; ++i) {

  if (hazard_pointers[i].pointer.load() == p) {

   return true;

  }

 }

 return false;

}

He нужно даже проверять, есть ли у записи владелец, так как в бесхозных записях все равно хранятся нулевые указатели, поэтому сравнение заведомо вернёт false; это еще упрощает код. Теперь функции reclaim_later() и delete_nodes_with_no_hazards() могут работать с простым связанным списком; reclaim_later() добавляет в него узлы, a delete_nodes_with_no_hazards() удаляет узлы, на которые не ссылаются указатели опасности. Реализация обеих функций приведена в следующем листинге.

Листинг 7.8. Простая реализация функций освобождения узлов

template<typename T>

void do_delete(void* p) {

 delete static_cast<T*>(p);

}

struct data_to_reclaim {

 void* data;

 std::function<void(void*)> deleter;

 data_to_reclaim* next;

 template<typename T>

 data_to_reclaim(T* p) : ← (1)

  data(p),

  deleter(&do_delete<T>), next(0) {}

 ~data_to_reclaim() {

  deleter(data); ← (2)

 }

};

std::atomic<data_to_reclaim*> nodes_to_reclaim;

void add_to_reclaim_list(data_to_reclaim* node) {← (3)

 node->next = nodes_to_reclaim.load();

 while (

  !nodes_to_reclaim.compare_exchange_weak(node->next, node));

}

template<typename T>

void reclaim_later(T* data) {                   ← (4)

 add_to_reclaim_list(new data_to_reclaim(data));← (5)

}

void delete_nodes_with_no_hazards() {

 data_to_reclaim* current =

  nodes_to_reclaim.exchange(nullptr);                   ← (6)

 while(current) {

  data_to_reclaim* const next = current->next;

  if (!outstanding_hazard_pointers_for(current->data)) {← (7)

   delete current;                                      ← (8)

  } else {

   add_to_reclaim_list(current);                        ← (9)

  }

  current = next;

 }

}

Полагаю, вы обратили внимание, что reclaim_later() — шаблон функции, а не обычная функция (4). Объясняется это тем, что указатели опасности — это универсальный механизм, поэтому не стоит ограничивать его только узлами стека. Ранее для хранения указателей мы использовали тип std::atomic<void*>. Поэтому мы должны обрабатывать произвольный указательный тип, но просто указать void* нельзя, так как мы собираемся удалять данные по указателю, а оператору delete нужно знать реальный тип указателя. Как мы скоро увидим, конструктор data_to_reclaim прекрасно справляется с этой проблемой: reclaim_later() просто создает новый экземпляр data_to_reclaim для переданного указателя и добавляет его в список отложенного освобождения (5). Сама функция add_to_reclaim_list() (3) — не более чем простой цикл по compare_exchange_weak() для головного элемента списка; мы уже встречались с такой конструкцией раньше.

Но вернёмся к конструктору data_to_reclaim (1), который также является шаблоном. Он сохраняет подлежащие удалению данные в виде указателя void* в члене data, после чего запоминает указатель на подходящую конкретизацию do_delete() — простую функцию, которая приводит тип void* к типу параметризованного указателя, а затем удаляет объект, на который он указывает. Шаблон std::function<> безопасно обертывает этот указатель на функцию, так что впоследствии деструктору data_to_reclaim для удаления данных нужно всего лишь вызвать запомненную функцию (2).

Деструктор data_to_reclaim не вызывается, когда мы добавляем узлы в список; он вызывается только, когда на узел не ссылается ни один указатель опасности. За это отвечает функция delete_nodes_with_no_hazards().

Эта функция сначала заявляет права на владение всем списком подлежащих освобождению узлов, вызывая exchange() (6). Это простое, но крайне важное действие гарантирует, что данный конкретный набор узлов будет освобождать только один поток. Другие потоки вправе добавлять в список новые узлы и даже пытаться освободить их, никак не затрагивая работу этого потока.

Далее мы по очереди просматриваем все узлы списка, проверяя, ссылаются ли на них не сброшенные указатели опасности (7). Если нет, то запись можно удалить (очистив хранящиеся в ней данные) (8). В противном случае мы возвращаем элемент в список, чтобы освободить его позже (9).

Хотя эта простая реализация справляется с задачей безопасного освобождения удаленных узлов, она заметно увеличивает накладные расходы. Для просмотра массива указателей опасности требуется проверить max_hazard_pointers атомарных переменных, и это делается при каждом вызове pop(). Атомарные операции по необходимости работают медленно — зачастую в 100 раз медленнее эквивалентных обычных операций на настольном ПК, — поэтому pop() оказывается дорогостоящей операцией. Мало того что приходится просматривать список указателей опасности для исключаемого из списка узла, так еще надо просмотреть его для каждого узла в списке ожидающих освобождения. Понятно, что это не слишком удачная идея. В списке может храниться max_hazard_pointers узлов, и каждый из них нужно сравнить с max_hazard_pointers хранимых указателей опасности. Черт! Должно существовать решение получше.

Более быстрые стратегии освобождения с применением указателей опасности

И оно, конечно же, существует. Показанное выше решение — это простая и наивная реализация указателей опасности, которую я привел, только чтобы объяснить идею. Первое, что можно сделать, — пожертвовать памятью ради быстродействия. Вместо того чтобы проверять каждый узел в списке освобождения при каждом обращении к pop(), мы вообще не будем пытаться освобождать узлы, пока их число в списке не превысит max_hazard_pointers. Тогда мы гарантированно сможем освободить хотя бы один узел. Но если просто ждать, пока в списке накопится max_hazard_pointers+1 узлов, то выиграем мы немного. После того как число узлов достигает max_hazard_pointers, мы будем пытаться освобождать их почти при каждом вызове pop(), так что проблема лишь немного отодвинулась во времени. Но если дождаться, пока в списке наберётся 2*max_hazard_pointers узлов, то мы гарантированно сможем освободить по крайней мере max_hazard_pointers узлов, и тогда следующую попытку нужно будет делать не раньше, чем через max_hazard_pointers обращений к pop(). Это уже гораздо лучше. Вместо того чтобы просматривать max_hazard_pointers узлов при каждом вызове pop() (и, возможно, ничего не освободить), мы проверяем 2*max_hazard_pointers через каждые max_hazard_pointers вызовов pop() и освобождаем не менее max_hazard_pointers. Получается, что в среднем мы проверяем два узла при каждом вызове pop(), и один из них точно освобождается.

Но и у этого решения есть недостаток (помимо увеличенного расхода памяти): теперь мы должны подсчитывать узлы в списке освобождения, то есть использовать атомарный счетчик, а, кроме того, за доступ к самому списку конкурируют несколько потоков. Если память позволяет, то можно предложить еще более эффективную схему освобождения: каждый поток хранит собственный список освобождения в поточно-локальной памяти. Тогда ни для счетчика, ни для доступа к списку не понадобятся атомарные переменные. Но в обмен придется выделить память для max_hazard_pointers * max_hazard_pointers узлов. Если поток завершается прежде, чем освобождены все его узлы, то оставшиеся можно перенести в глобальный список, как и раньше, а затем добавить в локальный список следующего потока, пожелавшего выполнить процедуру освобождения.

Еще один недостаток указателей опасности состоит в том, что они защищены патент пой заявкой, поданной IBM[13]. Если вы пишете программное обеспечение, которое будет применяться в стране, где эти патенты признаются, то придется получить соответствующую лицензию. Это проблема, общая для многих методов освобождения памяти без блокировок; поскольку в этой области ведутся активные исследования, крупные компании берут патенты всюду, где могут. Возможно, вы задаетесь вопросом, зачем я посвятил так много страниц описанию техники, которой многие не смогут воспользоваться. Что ж, вопрос не праздный. Во-первых, в некоторых случаях ей можно воспользоваться, не платя лицензионных отчислений. Например, если вы разрабатываете бесплатную программу на условиях лицензии GPL[14], то она может подпадать под заявление IBM об отказе от патентных притязаний[15]. Во-вторых — и это более существенно — объяснение техники помогает высветить вещи, о которых надо помнить при написании кода, свободного от блокировок, например, о плате за атомарные операции.

А существуют ли непатентованные методы освобождения памяти, применимые в программах без блокировок? К счастью, да. Один из них — подсчет ссылок.