7.2.5. Применение модели памяти к свободному от блокировок стеку
Прежде чем менять схему упорядочения, нужно исследовать все операции и определить, какие между ними должны быть отношения. Затем можно будет вернуться и найти минимальное упорядочение, которое эти отношения обеспечивает. Чтобы это сделать, потребуется взглянуть на ситуацию с точки зрения потоков в нескольких разных сценариях. Простейший сценарий возникает, когда один поток помещает элемент данных в стек, а другой через некоторое время извлекает его оттуда, с него и начнем.
В этом сценарии есть три существенных участника. Во-первых, структура counted_node_ptr, используемая для передачи данных — узла head. Во-вторых, структура node, на которую head ссылается. И, в-третьих, сами данные, на которые указывает узел.
Поток, выполняющий push(), сначала конструирует элемент данных и объект node, затем устанавливает head. Поток, выполняющий pop(), сначала загружает значение head, затем в цикле сравнения с обменом увеличивает хранящийся в нем счетчик ссылок, после чего читает структуру node, чтобы извлечь из нее значение next. Из этой последовательности можно вывести требуемое отношение; значение next — простой неатомарный объект, поэтому для его безопасного чтения должно существовать отношение происходит-раньше между операциями сохранения (в заталкивающем потоке) и загрузки (в выталкивающем потоке). Поскольку в push() имеется единственная атомарная операция — compare_exchange_weak(), а для существования отношения происходит-раньше между потоками нам нужна операция освобождения (release), то для функции compare_exchange_weak() необходимо задать упорядочение std::memory_order_release или более сильное. Если compare_exchange_weak() вернула false, то ничего не было изменено, и мы можем продолжить цикл, следовательно в этом случае нужна только семантика std::memory_order_relaxed:
void push(T const& data) {
counted_node_ptr new_node;
new_node.ptr = new node(data);
new_node.external_count = 1;
new_node.ptr->next = head.load(std::memory_order_relaxed);
while (!head.compare_exchange_weak(
new_node.ptr->next, new_node,
std::memory_order_release, std::memory_order_relaxed));
}
А что можно сказать о коде pop()? Чтобы получить желаемое отношение происходит-раньше, перед доступом к next необходима операция с семантикой std::memory_order_acquire или более сильной. Указатель, который разыменовывается для доступа к полю next, — это прежнее значение, прочитанное операцией compare_exchange_strong() в increase_head_count(), поэтому указанная семантика нужна в случае успеха. Как и в push(), если обмен закончился неудачно, мы просто повторяем цикл, поэтому для отказа можно задать ослабленное упорядочение:
void increase_head_count(counted_node_ptr& old_counter) {
counted_node_ptr new_counter;
do {
new_counter = old_counter;
++new_counter.external_count;
}
while (!head.compare_exchange_strong(
old_counter, new_counter,
std::memory_order_acquire, std::memory_order_relaxed));
old_counter.external_count = new_counter.external_count;
}
Если вызов compare_exchange_strong() завершается успешно, то мы знаем, что раньше в поле ptr прочитанного значения находилось то, что теперь хранится в переменной old_counter. Поскольку сохранение в push() было операцией освобождения, а данный вызов compare_exchange_strong() — операция захвата, то сохранение синхронизируется-с загрузкой, и мы имеем отношение происходит-раньше. Следовательно, сохранение в поле ptr в push() происходит-раньше доступа к ptr->next в pop(), и мы в безопасности.
Отметим, что для этого анализа порядок доступа к памяти в начальном вызове head.load() не имел значения, поэтому в нем безопасно задать семантику std::memory_order_relaxed.
Далее на очереди операция compare_exchange_strong(), которая записывает в head значение old_head.ptr->next. Нужно ли наложить на нее какие-нибудь ограничения, чтобы гарантировать целостность данных в этом потоке? Если обмен завершается успешно, то мы обращаемся к ptr->data, поэтому должны быть уверены, что сохранение ptr->data в потоке, выполняющем push(), происходит-раньше загрузки. Но такая уверенность уже есть: операция захвата в increase_head_count() гарантирует, что существует отношение синхронизируется-с между сохранением в потоке, выполняющем push(), и операцией сравнения с обменом. Поскольку сохранение данных в потоке, выполняющем push(), расположено перед сохранением head, а вызов increase_head_count() расположен перед загрузкой ptr->data, то отношение происходит-раньше имеет место, и всё будет хорошо, даже если для операции сравнения с обменом в pop() задана семантика std::memory_order_relaxed. Есть еще всего одно место, где изменяется ptr->data — тот самый вызов swap(), на который вы сейчас смотрите, и ни один другой поток не может оперировать тем же узлом — в этом и заключается смысл сравнения с обменом.
Если compare_exchange_strong() завершается неудачно, то к новому значению old_head не будет обращений до следующей итерации цикла, и, поскольку мы уже решили, что семантики std::memory_order_acquire хватало в increase_head_count(), то здесь будет достаточно std::memory_order_relaxed.
Что можно сказать насчёт других потоков? Нужны ли более сильные ограничения, чтобы и другие потоки работали безопасно? Нет, не нужны, потому что head модифицируется только операциями сравнения с обменом. Будучи операциями чтения-модификации-записи, они составляют часть последовательности освобождений, начатой операцией сравнения с обменом в push(). Поэтому compare_exchange_weak() в push() синхронизируется-с операцией compare_exchange_strong() в increase_head_count(), которая прочитает сохраненное значение, даже если в промежутке другие потоки изменят head.
Мы почти закончили, осталось только рассмотреть функции, в которых используются операции fetch_add(), изменяющие счетчик ссылок. Поток, который добрался до возврата данных из узла, может продолжать в твердой уверенности, что никакой другой поток не сможет модифицировать хранящиеся в узле данные. Однако любой поток, который потерпел неудачу при извлечении данных, знает, что какой-то другой поток данные в узле модифицировал; он использовал функцию swap() для извлечения данных. Следовательно, чтобы предотвратить гонку за данными мы должны гарантировать, что swap() происходит-раньше delete. Чтобы добиться этого, проще всего задать семантику std::memory_order_release при вызове fetch_add() в ветви, где мы возвращаем данные, и семантику std::memory_order_acquire — в ветви, где мы возвращаемся в начало цикла. Однако даже это перебор — лишь один поток выполняет delete (тот, что сбросил счетчик в нуль), поэтому только этому потоку нужно выполнить операцию захвата. К счастью, поскольку fetch_add() — операция чтения-модификации-записи, то она составляет часть последовательности освобождений, поэтому для достижения цели нам достаточно дополнительной операции load(). Если в ветви, где происходит возврат в начало цикла, счетчик ссылок уменьшается до нуля, то здесь же можно перезагрузить счетчик ссылок с семантикой std::memory_order_acquire, чтобы обеспечить требуемое отношение синхронизируется-с, а в самой операции fetch_add() достаточно задать std::memory_order_relaxed. Окончательная реализация стека с новой версией pop() приведена ниже.
Листинг 7.12. Свободный от блокировок стек с подсчётом ссылок и ослабленными атомарными операциями
template<typename T>
class lock_free_stack {
private:
struct node;
struct counted_node_ptr {
int external_count;
node* ptr;
};
struct node {
std::shared_ptr<T> data;
std::atomic<int> internal_count;
counted_node_ptr next;
node(T const& data_):
data(std::make_shared<T>(data_)), internal_count(0) {}
};
std::atomic<counted_node_ptr> head;
void increase_head_count(counted_node_ptr& old_counter) {
counted_node_ptr new_counter;
do {
new_counter = old_counter;
++new_counter.external_count;
}
while (!head.compare_exchange_strong(old_counter, new_counter,
std::memory_order_acquire,
std::memory_order_relaxed));
old_counter.external_count = new_counter.external_count;
}
public:
~lock_free_stack() {
while(pop());
}
void push(T const& data) {
counted_node_ptr new_node;
new_node.ptr = new node(data);
new_node.external_count = 1;
new_node.ptr->next = head.load(std::memory_order_relaxed);
while (!head.compare_exchange_weak(
new_node.ptr->next, new_node,
std::memory_order_release,
std::memory_order_relaxed));
}
std::shared_ptr<T> pop() {
counted_node_ptr old_head =
head.load(std::memory_order_relaxed);
for (;;) {
increase_head_count(old_head);
node* const ptr = old_head.ptr;
if (!ptr) {
return std::shared_ptr<T>();
}
if (head.compare_exchange_strong(old_head, ptr->next,
std::memory_order_relaxed)) {
std::shared_ptr<T> res;
res.swap(ptr->data);
int const count_increase = old_head.external_count — 2;
if (ptr->internal_count.fetch_add(count_increase,
std::memory_order_release) == -count_increase) {
delete ptr;
}
return res;
}
else if (ptr->internal_count.fetch_add(-1,
std::memory_order_relaxed) == 1) {
ptr->internal_count.load(std::memory_order_acquire);
delete ptr;
}
}
}
};
Мы немало потрудились, но наконец-то дошли до конца, и стек теперь стал куда лучше. За счет тщательно продуманного применения ослабленных операций нам удалось повысить производительность, не жертвуя корректностью. Как видите, реализация pop() теперь насчитывает 37 строк вместо 8 в эквивалентной реализации pop() для стека с блокировками (листинг 7.1) и 7 строк для простого свободного от блокировок стека без управления памятью (листинг 7.2). При рассмотрении свободной от блокировок очереди мы встретимся с аналогичной ситуацией: сложность кода в значительной степени обусловлена именно управлением памятью.