7.2.2. Устранение утечек: управление памятью в структурах данных без блокировок

При первом подходе к pop() мы решили смириться с утечкой узлов, чтобы избежать гонки в случае, когда один поток удаляет узел, а другой в это время хранит указатель на него, который собирается разыменовать. Однако в корректной программе на С++ утечка памяти, конечно, недопустима, так что с этим надо что-то делать. Сейчас мы покажем, как эта проблема решается.

Основная трудность состоит в том, что мы хотим освободить занятую узлом память, но не можем сделать это, пока нет уверенности, что никакой другой поток не хранит указателей на нее. Если бы в каждый момент времени только один поток вызывал pop() для данного экземпляра стека, то все было бы прекрасно. Функция push() не обращается к уже добавленному в стек узлу, так что кроме потока, вызвавшего pop(), этот узел больше никого не интересует, и его можно безопасно удалить.

С другой стороны, если мы все-таки хотим, чтобы несколько потоков могли одновременно вызывать pop(), то нужно каким-то образом отслеживать момент, когда удаление узла становится безопасным. По сути дела, это означает, что необходимо написать специализированный сборщик мусора для узлов node. Звучит пугающе, и эта задача действительно не самая простая, но на практике все не так плохо: мы проверяем только указатели на node и только те узлы, к которым обращается pop(). Нас не интересуют узлы внутри push(), потому что они доступны только одному потоку, пока не окажутся в стеке. А вот внутри pop() к одному и тому же узлу могут одновременно иметь доступ несколько потоков.

Если потоков, вызывающих pop(), нет вообще, то можно без опаски удалить все узлы, ожидающие удаления. Поэтому, если после извлечения данных помещать узлы в список «подлежат удалению», то их можно будет удалить одним махом в момент, когда не будет потоков, вызывающих pop(). Но как узнать, что потоков, вызывающих pop(), действительно нет? Очень просто — подсчитывать их. Если увеличивать счетчик при входе и уменьшать при выходе, то удалять узлы из списка «подлежащих удалению» можно будет, когда счетчик становится равным нулю. Разумеется, сам счетчик должен быть атомарным, чтобы к нему можно было безопасно обращаться из нескольких потоков. В листинге 7.4 показала исправленная функция pop(), а в листинге 7.5 — вспомогательные функции, используемые в ее реализации.

Листинг 7.4. Освобождение занятой узлами памяти в момент, когда нет потоков, вызывающих pop()

template<typename T>

class lock_free_stack {

private:

 std::atomic<unsigned> threads_in_pop;←┐ Атомарная

 void try_reclaim(node* old_head);     (1) переменная

public:

 std::shared_ptr<T> pop()

 {                            (2) Увеличить счетчик

  ++threads_in_pop;           ←┤ перед тем, как что-то

  node* old_head = head.load();│ делать

  while (old_head &&

   !head.compare_exchange_weak(old_head, old_head->next));

  std::shared_ptr<T> res;

  if (old_head)

  {                         (3) He копировать

   res.swap(old_head->data);←┤ указатель, а извлечь

  }                          │ данные из узла

  try_reclaim(old_head);←┐ Освободить удаленные

  return res;            (4) узлы, если получится

 }

};

Атомарная переменная threads_in_pop (1) нужна для подсчета потоков, которые в данный момент пытаются извлечь элемент из стека. Она увеличивается на единицу в начале pop() (2) и уменьшается на единицу внутри функции try_reclaim(), которая вызывается после изъятия узла из списка (4). Поскольку мы откладываем удаление самого узла, то можно с помощью swap() переместить из него данные (3), а не просто скопировать указатель; тогда данные будут автоматически удалены, когда в них отпадает необходимость, вместо того, чтобы занимать память только потому, что на них ссылается еще не удаленный узел. В следующем листинге показан код функции try_reclaim().

Листинг 7.5. Механизм освобождения памяти на основе подсчёта ссылок

template<typename T>

class lock_free_stack {

private:

 std::atomic<node*> to_be_deleted;

 static void delete_nodes(node* nodes) {

  while (nodes) {

   node* next = nodes->next;

   delete nodes;

   nodes = next;

  }

 }

 void try_reclaim(node* old_head) {

  if (threads_in_pop == 1) ← (1)

  {          Заявляем права на список подлежащих удалению узлов (2)

   node* nodes_to_delete = to_be_deleted.exchange(nullptr);←┘

   if (!--threads_in_pop)←┐ Я — единственный

   {                      (3) поток в pop()?

    delete_nodes(nodes_to_delete);       ← (4)

   } else if(nodes_to_delete) {          ← (5)

    chain_pending_nodes(nodes_to_delete);← (6)

   }

   delete old_head; ← (7)

  } else {

   chain_pending_node(old_head); ← (8)

   --threads_in_pop;

  }

 }

 void chain_pending_nodes(node* nodes) {

  node* last = nodes;

  while (node* const next =

         last->next) {←┐  По указателям

                       ├ (9) next доходим до

   last = next;        │  конца списка

  }

  chain_pending_nodes(nodes, last);

 }

 void chain_pending_nodes(node* first, node* last) {

  last->next = to_be_deleted;← (10)

  while (

   !to_be_deleted.compare_exchange_weak(←┐    цикл гарантиру-

   last->next, first));                  ├ (11)ет, что last->next

  }                                      │    корректно

 void chain_pending_node(node* n) {

  chain_pending_nodes(n, n);← (12)

 }

};

Если при попытке освободить занятую узлом (1) память счетчик threads_in_pop оказывается равен 1, то данный поток — единственный в pop(), и, значит, можно безопасно удалять только что исключенный из списка узел (7) и, быть может, также узлы, ожидающие удаления. Если счетчик не равен 1, то никакие узлы удалять нельзя, поэтому мы просто добавляем узел в список ожидающих (8).

Предположим, что threads_in_pop равно 1. Тогда нам нужно освободить ожидающие узлы, потому что если этого не сделать, то они так и будут ожидать удаления до момента уничтожения стека. Для этого мы запрашиваем монопольные права на список с помощью атомарной операции exchange (2), после чего уменьшаем на единицу счетчик threads_in_pop (3). Если в результате счетчик оказался равным нулю, значит, больше ни один поток не работает со списком ожидающих удаления узлов. По ходу дела могли появиться новые ожидающие узлы, но сейчас — когда можно безопасно очистить список — нам это безразлично. Мы просто вызываем функцию delete_nodes, которая обходит список и удаляет узлы (4).

Если счетчик после уменьшения не равен нулю, то освобождать узлы небезопасно, поэтому если такие узлы есть (5), то нужно поместить их в начало списка ожидающих (6). Такое может случиться, если к структуре данных одновременно обращаются несколько потоков. Другие потоки могли вызвать pop() в промежутке между первой проверкой threads_in_pop (1) и «заявлением прав» на список (2) и добавить в список узлы, к которым все еще обращается один или несколько потоков. На рис. 7.1 поток С добавляет узел Y в список to_be_deleted, несмотря на то, что поток В все еще ссылается на него по указателю old_head и, значит, будет пробовать читать его указатель next. Поэтому поток А не может удалить узлы без риска вызвать неопределенное поведение в потоке В.

Рис. 7.1. На примере трех потоков, вызывающих pop(), видно, почему необходимо проверять threads_in_pop после заявления прав на список узлов, ожидающих удаления, в try_reclaim()

Чтобы поместить узлы, ожидающие удаления, в список ожидающих, мы используем уже имеющийся указатель next для связывания. Для того чтобы добавить цепочку в список, мы проходим до конца цепочки (9), записываем в указатель next в последнем узле текущее значение to_be_deleted (10) и сохраняем указатель на первый узел цепочки как новый указатель to_be_deleted (11). Здесь необходимо вызывать compare_exchange_weak в цикле, чтобы не допустить утечки узлов, добавленных другим потоком. В результате в next записывается указатель на последний узел цепочки, если он изменился. Добавление единственного узла в список — это особый случай, когда первый узел в добавляемой цепочке совпадает с последним (12).

Этот алгоритм работает вполне приемлемо, если нагрузка невелика, то есть существуют моменты затишья, когда в pop() нет ни одного потока. Но эта ситуация кратковременна; именно поэтому мы должны проверять, что счетчик threads_in_pop после уменьшения обратился в нуль (3), прежде чем освобождать память, и по той же причине проверка стоит до удаления только что изъятого из стека узла (7). Удаление узла может занять относительно много времени, а мы хотим, чтобы окно, в котором другие потоки могут модифицировать список, было как можно короче. Чем больше времени проходит между моментом, когда поток впервые обнаружил, что threads_in_pop равно 1, и попыткой удалить узлы, тем больше шансов, что какой-то другой поток вызовет pop(), после чего threads_in_pop перестанет быть равно 1 и, стало быть, удалять узлы станет нельзя.

Если нагрузка высока, то затишье может не наступить никогда, поскольку новые потоки входят в pop() до того, как пребывавшие там успевают выйти. В таком случае список to_be_deleted будет расти неограниченно, и мы снова сталкиваемся с утечкой памяти. Если периодов затишья не ожидается, то необходим другой механизм освобождения узлов. Главное здесь — определить, когда ни один поток больше не обращается к конкретному узлу, который, следовательно, можно освободить. Из всех возможных механизмов такого рода для понимания проще всего тот, в котором используются указатели опасности (hazard pointers).