7.2.6. Потокобезопасная очередь без блокировок
Очередь отличается от стека прежде всего тем, что операции push() и pop() обращаются к разным частям структуры данных, тогда как в стеке та и другая работают с головным узлом списка. Следовательно, и проблемы синхронизации тоже другие. Требуется сделать так, чтобы изменения, произведенные на одном конце, были видны при доступе с другого конца. Однако структура функции try_pop() в листинге 6.6 не так уж сильно отличается от структуры pop() в простом свободном от блокировок стеке в листинге 7.2, поэтому можно с достаточными основаниями предположить, что и весь свободный от блокировок код будет схожим. Посмотрим, так ли это.
Если взять листинг 6.6 за основу, то нам понадобятся два указателя на node: один для головы списка (head), второй — для хвоста (tail). Поскольку мы собираемся обращаться к ним из нескольких потоков, то надо бы сделать эти указатели атомарными и расстаться с соответствующими мьютексами. Начнём с этого небольшого изменения и посмотрим, куда оно нас приведет. Результат показан в листинге ниже.
Листинг 7.13. Свободная от блокировок очередь с одним производителем и одним потребителем
template<typename T>
class lock_free_queue {
private:
struct node {
std::shared_ptr<T> data;
node* next;
node():
next(nullptr) {}
};
std::atomic<node*> head;
std::atomic<node*> tail;
node* pop_head() {
node* const old_head = head.load();
if (old_head == tail.load()) {← (1)
return nullptr;
}
head.store(old_head->next);
return old_head;
}
public:
lock_free_queue():
head(new node), tail(head.load()) {}
lock_free_queue(const lock_free_queue& other) = delete;
lock_free_queue& operator=(
const lock_free_queue& other) = delete;
~lock_free_queue() {
while(node* const old_head = head.load()) {
head.store(old_head->next);
delete old_head;
}
}
std::shared_ptr<T> pop() {
node* old_head = pop_head();
if (!old_head) {
return std::shared_ptr<T>();
}
std::shared_ptr<T> const res(old_head->data);← (2)
delete old_head;
return res;
}
void push(T new_value) {
std::shared_ptr<T> new_data(std::make_shared<T>(new_value));
node* p = new node; ← (3)
node* const old_tail = tail.load(); ← (4)
old_tail->data.swap(new_data); ← (5)
old_tail->next = p; ← (6)
tail.store(p); ← (7)
}
};
На первый взгляд, неплохо, и если в каждый момент времени существует только один поток, вызывающий push(), и только один поток, вызывающий pop(), то вообще всё прекрасно. Важно отметить, что в этом случае существует отношение происходит-раньше между push() и pop(), благодаря которому извлечение данных безопасно. Сохранение tail (7) синхронизируется-с загрузкой tail (1), сохранение указателя на data в предыдущем узле (5) расположено перед сохранением tail, а загрузка tail расположена перед загрузкой указателя на data (2), поэтому сохранение data происходит раньше его загрузки, и всё замечательно. Таким образом, мы получили корректно обслуживаемую очередь с одним производителем и одним потребителем.
Проблемы начинаются, когда несколько потоков вызывают push() или pop() одновременно. Сначала рассмотрим push(). Если два потока одновременно вызывают push(), то оба выделяют память для нового фиктивного узла (3), оба читают одно и то же значение tail (4) и, следовательно, оба изменяют данные-члены data и next одного и того же узла (5), (6). А это уже гонка за данными!
Аналогичные проблемы возникают в pop_head(). Если два потока вызывают эту функцию одновременно, то оба читают одно и то же значение head, и оба перезаписывают старое значение одним и тем же указателем next. Оба потока теперь думают, что получили один и тот же узел, — прямой путь к катастрофе. Мы должны не только сделать так, чтобы лишь один поток извлекал данный элемент, но и позаботиться о том, чтобы другие потоки могли безопасно обращаться к члену next узла, который прочитали из head. Это точно та же проблема, с которой мы сталкивались при написании pop() для свободного от блокировок стека, поэтому и любое из предложенных тогда решений можно применить здесь.
Итак, проблему pop() можно считать решенной, но как быть с push()? Здесь трудность заключается в том, что для получения требуемого отношения происходит-раньше между push() и pop() мы должны заполнить поля фиктивного узла до обновления tail. Но это означает, что одновременные вызовы push() конкурируют за те же самые данные, так как был прочитал один и тот же указатель tail.
Решение проблемы нескольких потоков в push()
Один из способов — добавить фиктивный узел между реальными. Тогда единственной частью текущего узла tail, нуждающейся в обновлении, будет указатель next, который, следовательно, можно было бы сделать атомарным. Если потоку удалось записать в next указатель на свой новый узел вместо nullptr, то он успешно добавил узел; в противном случае ему придется начать сначала и снова прочитать tail. Это потребует небольшого изменения в pop() — нужно будет игнорировать узлы с нулевым указателем на данные и возвращаться в начало цикла. Недостаток этого решения в том, что при каждом вызове pop() придется как правило исключать из списка два узла и производить в два раза больше операций выделения памяти.
Второй способ — сделать указатель data атомарным и устанавливать его с помощью операции сравнения с обменом. Если она завершится успешно, то мы получили свой хвостовой узел и можем безопасно записать в next указатель на наш новый узел, а затем обновить tail. Если же сравнение с обменом завершается неудачно, потому что другой поток успел сохранить данные, мы возвращаемся в начало цикла, заново читаем tail и пробуем снова. Если атомарные операции над std::shared_ptr<> свободны от блокировок, то дело сделано. Если нет, нужна альтернатива. Можно, например, заставить pop() возвращать std::unique_ptr<> (в конце концов, это ведь единственная ссылка на объект) и сохранять данные в очереди в виде простого указателя. Тогда его можно было бы хранить как std::atomic<T*> и впоследствии обновлять с помощью compare_exchange_strong(). Если воспользоваться для поддержки нескольких потоков в pop() схемой подсчета ссылок из листинга 7.11, то push() будет выглядеть следующим образом.
Листинг 7.14. Первая (неудачная) попытка переработки push()
void push(T new_value) {
std::unique_ptr<T> new_data(new T(new_value));
counted_node_ptr new_next;
new_next.ptr = new node;
new_next.external_count = 1;
for (;;) {
node* const old_tail = tail.load();← (1)
T* old_data = nullptr;
if (old_tail->data.compare_exchange_strong(
old_data, new_data.get())) { ← (2)
old_tail->next = new_next;
tail.store(new_next.ptr); ← (3)
new_data.release();
break;
}
}
}
Применение схемы подсчета ссылок устраняет эту конкретную гонку, но в push() имеются и другие гонки. Взглянув на переработанную версию push() в листинге 7.14, вы обнаружите ту же ситуацию, что уже встречалась нам в стеке: загрузка атомарного указателя (1) и разыменование этого указателя (2). В промежутке между этими двумя операциями другой поток может изменить указатель (3), что в конечном итоге приведет к освобождению памяти, запятой узлом (в pop()). Если это произойдет раньше, чем мы разыменовываем указатель, то получится неопределенное поведение. Ой! Возникает искушение добавить в tail внешний счетчик, как мы уже поступили для head, однако на каждый узел уже имеется внешний счетчик в указателе next в предыдущем узле очереди. Если хранить два внешних счетчика для одного узла, то потребуется модифицировать схему подсчета ссылок, чтобы не удалить узел преждевременно. Проблему можно решить, подсчитывая число внешних счетчиков в структуре node и уменьшая это число при уничтожении внешнего счетчика (одновременно с прибавлением значения внешнего счетчика к значению внутреннего). Если внутренний счетчик равен нулю, а внешних не осталось, то узел можно удалять. Эту технику я впервые встретил в проекте Джо Сейга (Joe Seigh) Atomic Ptr Plus[16]. В следующем листинге показано, как выглядит push() при использовании такой схемы.
Листинг 7.15. Реализация push() для очереди без блокировок с подсчётом ссылок на tail
template<typename T>
class lock_free_queue {
private:
struct node;
struct counted_node_ptr {
int external_count;
node* ptr;
};
std::atomic<counted_node_ptr> head;
std::atomic<counted_node_ptr> tail;← (1)
struct node_counter {
unsigned internal_count:30;
unsigned external_counters:2;← (2)
};
struct node {
std::atomic<T*> data;
std::atomic<node_counter> count;← (3)
counted_node_ptr next;
node() {
node_counter new_count;
new_count.internal_count = 0;
new_count.external_counters = 2;← (4)
count.store(new_count);
next.ptr = nullptr;
next.external_count = 0;
}
};
public:
void push(T new_value) {
std::unique_ptr<T> new_data(new T(new_value));
counted_node_ptr new_next;
new_next.ptr = new node;
new_next.external_count = 1;
counted_node_ptr old_tail = tail.load();
for (;;) {
increase_external_count(tail, old_tail); ← (5)
T* old_data = nullptr;
if (old_tail.ptr->data.compare_exchange_strong(← (6)
old_data, new_data.get())) {
old_tail.ptr->next = new_next;
old_tail = tail.exchange(new_next);
free_external_counter(old_tail); ← (7)
new_data.release();
break;
}
old_tail.ptr->release_ref();
}
}
};
В листинге 7.15 tail теперь имеет такой же тип atomic<counted_node_ptr>, как и head (1), а в структуру node добавлен член count вместо прежнего internal_count (3). Член count сам представляет собой структуру с двумя полями: internal_count и external_counters (2). Под поле external_counters отведено только 2 бита, потому что внешних счетчиков может быть не более двух. Воспользовавшись битовыми полями и отведя под internal_count 30 бит, мы ограничили длину поля счетчика 32 битами. В результате мы убиваем сразу двух зайцев: и значение внутреннего счетчика может быть достаточно велико, и вся структура помещается в машинное слово на 32- и 64-разрядных машинах. Очень важно изменять счетчики как единое целое, чтобы избежать гонки. Как это делается, мы покажем чуть ниже. На многих платформах хранение структуры в одном машинном слове повышает шансы на то, что атомарные операции окажутся свободными от блокировок.
При инициализации структуры node в поле internal_count записывается 0, а в поле external_counters — 2 (4), потому что сразу после добавления нового узла в очередь на него есть две ссылки: из tail и из указателя next в предыдущем узле. Код самой функции push() похож на приведенный в листинге 7.14 с тем отличием, что перед тем как разыменовывать загруженное из tail значение, чтобы вызвать compare_exchange_strong() для члена узла data (6), мы вызываем новую функцию increase_external_count() которая увеличивает счетчик (5), а затем функцию free_external_counter() для старого хвоста old_tail (7).
Разобравшись с push(), обратим наши взоры на pop(). В ее коде (см. листинг 7.16) логика подсчета ссылок из реализации pop() в листинге 7.11 комбинируется с логикой извлечения из очереди в листинге 7.13.
Листинг 7.16. Извлечение узла из очереди без блокировок с подсчётом ссылок на tail
template<typename T>
class lock_free_queue {
private:
struct node {
void release_ref();
};
public:
std::unique_ptr<T> pop() {
counted_node_ptr old_head =
head.load(std::memory_order_relaxed); ← (1)
for (;;) {
increase_external_count(head, old_head);← (2)
node* const ptr = old_head.ptr;
if (ptr == tail.load().ptr) {
ptr->release_ref();← (3)
return std::unique_ptr<T>();
}
if (head.compare_exchange_strong(old_head, ptr->next)) {← (4)
T* const res = ptr->data.exchange(nullptr);
free_external_counter(old_head);← (5)
return std::unique_ptr<T>(res);
}
ptr->release_ref();
}
}
};
Все начинается с загрузки значения old_head перед входом в цикл (1) и до увеличения внешнего счетчика в загруженном значении (2). Если узел head совпадает с tail, то можно освободить ссылку (3) и вернуть нулевой указатель, потому что очередь пуста. Если же в очереди есть данные, то мы пытаемся заявить на них свои права с помощью compare_exchange_strong() (4). Как и в случае стека в листинге 7.11, мы при этом сравниваем внешний счетчик и указатель как единое целое; если хотя бы один из них изменился, то мы должны вернуться в начало цикла, освободив предварительно ссылку 6. Если обмен завершился удачно, то мы получили в свое распоряжение данные в узле, поэтому можем вернуть их вызывающей программе, освободив предварительно внешний счетчик ссылок на извлеченный узел (5). После того как оба внешних счетчика освобождены, а внутренний счетчик обратился в нуль, сам узел можно удалять. Вспомогательные функции подсчета ссылок приведены в листингах 7.17, 7.18 и 7.19.
Листинг 7.17. Освобождение ссылки на узел в очереди без блокировок
template<typename T>
class lock_free_queue {
private:
struct node {
void release_ref() {
node_counter old_counter =
count.load(std::memory_order_relaxed);
node_counter new_counter;
do {
new_counter = old_counter;
--new_counter.internal_count; ← (1)
}
while (!count.compare_exchange_strong(← (2)
old_counter, new_counter,
std::memory_order_acquire, std::memory_order_relaxed));
if (
!new_counter.internal_count &&
!new_counter.external_counters) {
delete this; ← (3)
}
}
};
};
Реализация node::release_ref() лишь немногим отличается от аналогичного кода в lock_free_stack::pop() (см. листинг 7.11). Там мы работали с единственным внешним счетчиком, поэтому достаточно было вызвать fetch_sub. Здесь же необходимо атомарно обновить всю структуру count, хотя в действительности мы хотим модифицировать только поле internal_count (1). Поэтому никуда не деться от цикла сравнения с обменом (2). Если после уменьшения internal_count оказалось, что и внутренний, и внешний счетчик равны нулю, то это была последняя ссылка, и мы можем удалять узел (3).
Листинг 7.18. Получение новой ссылки на узел в очереди без блокировок
template<typename T>
class lock_free_queue {
private:
static void increase_external_count(
std::atomic<counted_node_ptr>& counter,
counted_node_ptr& old_counter) {
counted_node_ptr new_counter;
do {
new_counter = old_counter;
++new_counter.external_count;
}
while (!counter.compare_exchange_strong(
old_counter, new_counter,
std::memory_order_acquire, std::memory_order_relaxed));
old_counter.external_count = new_counter.external_count;
}
};
Листинг 7.18 завершает картину. На этот раз мы не освобождаем ссылку, а получаем новую и увеличиваем внешний счетчик. Функция increase_external_count() аналогична increase_head_count() из листинга 7.12, отличаясь от нее тем, что преобразована в статическую функцию-член, которая принимает подлежащий обновлению внешний счетчик извне, а не оперирует внутренним членом класса.
Листинг 7.19. Освобождение счётчика внешних ссылок на узел в очереди без блокировок
template<typename T>
class lock_free_queue {
private:
static void free_external_counter(
counted_node_ptr &old_node_ptr) {
node* const ptr = old_node_ptr.ptr;
int const count_increase = old_node_ptr.external_count — 2;
node_counter old_counter =
ptr->count.load(std::memory_order_relaxed);
node_counter new_counter;
do {
new_counter = old_counter;
--new_counter.external_counters; ← (1)
new_counter.internal_count += count_increase;← (2)
}
while (!ptr->count.compare_exchange_strong( ← (3)
old_counter, new_counter,
std::memory_order_acquire, std::memory_order_relaxed));
if (!new_counter.internal_count &&
!new_counter.external_counters) {
delete ptr;← (4)
}
}
};
Функция free_external_counter() дополняет increase_external_count(). Она аналогична эквивалентной функции из реализации lock_free_stack::pop() в листинге 7.11, но модифицировала с учетом появления поля external_counters. Она обновляет оба счетчика в одном вызове compare_exchange_strong() для всей структуры count (3) — точно так же мы поступали при уменьшении internal_count в release_ref(). Значение internal_count обновляется, как в листинге 7.11 (2), a external_counters уменьшается на единицу (1). Если теперь оба значения равны нулю, значит, ссылок на узел не осталось, поэтому его можно удалять (4). Оба обновления необходимо выполнять в одном действии (потому и нужен цикл сравнения с обменом), чтобы избежать гонки. Если бы счетчики обновлялись порознь, то два разных потока могли бы решить, что владеют последней ссылкой на узел, и удалить его, что привело бы к неопределенному поведению.
Хотя теперь функция работает и свободна от блокировок, осталась еще одна проблема, касающаяся производительности. После того как один поток начал операцию push(), успешно выполнив compare_exchange_strong() от имени old_tail.ptr->data (точка (5) в листинге 7.15), никакой другой войти в push() не может. Попытавшись это сделать, поток увидит новое значение, отличное от nullptr, в результате чего вызов compare_exchange_strong() вернет false, и потоку придется начать цикл заново. Это активное ожидание, которое только потребляет время процессора, не продвигаясь вперед ни на йоту. По сути дела, это блокировка. Первый удачный вызов push() блокирует все остальные потоки, пока не завершится, так что этот код более не свободен от блокировок. Хуже того — обычно операционная система может отдать приоритет потоку, удерживающему мьютекс, если существуют заблокированные потоки, но только не в данном случае, поэтому остальные потоки так и будут пожирать процессорное время, пока первый не закончит. И тут мы вытащим на свет очередной припасенный для освобождения от блокировок трюк: ожидающий поток может помочь потоку, который выполняет push().
Освобождение от блокировок одного потока с помощью другого
Чтобы вновь сделать код свободным от блокировок, нам нужно придумать, как ожидающий поток может продвигаться вперед, даже если поток, находящийся в push(), застрял. Один из способов — помочь застрявшему потоку, выполнив за него часть работы.
В данном случае мы точно знаем, что нужно сделать: указатель next в хвостовом узле требуется установить на новый фиктивный узел, и тогда сам указатель tail можно будет обновить. Все фиктивные узлы эквивалентны, поэтому не имеет значения, какой из них использовать — созданный потоком, который успешно поместил в очередь данные, или потоком, ожидающим входа в push(). Если сделать указатель next в узле атомарным, то для его установки можно будет применить compare_exchange_strong(). После того как указатель next установлен, в цикле по compare_exchange_weak() можно будет установить tail, проверяя при этом, указывает ли он по-прежнему на тот же самый исходный узел. Если это не так, значит, узел обновил какой-то другой поток, так что можно прекратить попытки и перейти в начало цикла. Реализация этой идеи потребует также небольшого изменения pop(), где нужно будет загрузить указатель next; эта модификация показана в листинге ниже.
Листинг 7.20. Модификация pop() с целью помочь при выполнении push()
template<typename T>
class lock_free_queue {
private:
struct node {
std::atomic<T*> data;
std::atomic<node_counter> count;
std::atomic<counted_node_ptr> next;← (1)
};
public:
std::unique_ptr<T> pop() {
counted_node_ptr old_head =
head.load(std::memory_order_relaxed);
for (;;) {
increase_external_count(head, old_head);
node* const ptr = old_head.ptr;
if (ptr == tail.load().ptr) {
return std::unique_ptr<T>();
}
counted_node_ptr next = ptr->next.load();← (2)
if (head.compare_exchange_strong(old_head, next)) {
T* const res = ptr->data.exchange(nullptr);
free_external_counter(old_head);
return std::unique_ptr<T>(res);
}
ptr->release_ref();
}
}
};
Как я и говорил, изменения тривиальны: указатель next теперь атомарный (1), поэтому операция load в точке (2) атомарна. В данном примере мы используем упорядочение по умолчанию memory_order_seq_cst, поэтому явное обращение к load() можно было бы опустить, полагаясь на операцию загрузки в операторе неявного преобразования к типу counted_node_ptr, но явное обращение будет напоминать нам, куда впоследствии добавить явное задание порядка обращения к памяти.
Листинг 7.21. Пример реализации функции push(), освобождаемой от блокировок благодаря помощи извне
template<typename T>
class lock_free_queue {
private:
void set_new_tail(counted_node_ptr &old_tail, ← (1)
counted_node_ptr const &new_tail) {
node* const current_tail_ptr = old_tail.ptr;
while (!tail.compare_exchange_weak(old_tail, new_tail) &&← (2)
old_tail.ptr == current_tail_ptr);
if (old_tail.ptr == current_tail_ptr)← (3)
free_external_counter(old_tail); ← (4)
else
current_tail_ptr->release_ref(); ← (5)
}
public:
void push(T new_value) {
std::unique_ptr<T> new_data(new T(new_value));
counted_node_ptr new_next;
new_next.ptr = new node;
new_next.external_count = 1;
counted_node_ptr old_tail = tail.load();
for (;;) {
increase_external_count(tail, old_tail);
T* old_data = nullptr;
if (old_tail.ptr->data.compare_exchange_strong( ← (6)
old_data, new_data.get())) {
counted_node_ptr old_next = {0};
if (!old_tail.ptr->next.compare_exchange_strong(← (7)
old_next, new_next)) {
delete new_next.ptr;← (8)
new_next = old_next;← (9)
}
set_new_tail(old_tail, new_next);
new_data.release();
break;
} else { ← (10)
counted_node_ptr old_next = {0};
if (old_tail.ptr->next.compare_exchange_strong(← (11)
old_next, new_next)) {
old_next = new_next; ← (12)
new_next.ptr = new node;← (13)
}
set_new_tail(old_tail, old_next);← (14)
}
}
}
};
В целом похоже на исходную версию push() из листинга 7.15, но есть и несколько принципиальных отличий. Если указатель data действительно установлен (6), то нужно обработать случай, когда нам помог другой поток, и кроме того появилась ветвь else, в которой один поток оказывает помощь другому (10).
Установив указатель data в узле (6), новая версия push() изменяет указатель next, вызывая compare_exchange_strong() (7). Мы используем compare_exchange_strong(), чтобы избежать цикла. Если обмен завершился неудачно, значит, другой поток уже установил указатель next, поэтому нам ни к чему узел, выделенный в начале, и его можно удалить (8). Мы также хотим использовать значение next, установленное другим потоком, для обновления tail (9).
Собственно обновление указателя tail вынесено в отдельную функцию set_new_tail() (1). В ней мы используем цикл по compare_exchange_weak() (2), потому что если другие потоки пытаются поместить в очередь новый узел с помощью push(), то значение external_count может измениться, а нам не хотелось бы его потерять. Однако нужно позаботиться и о том, чтобы не затереть значение, которое другой поток уже успешно изменил, в противном случае в очереди могут возникнуть циклы, а это уже совершенно лишнее. Следовательно, нужно гарантировать, что часть ptr загруженного значения осталась той же самой, если сравнение с обменом не прошло. Если ptr не изменился после выхода из цикла (3), то мы успешно установили tail, поэтому старый внешний счетчик нужно освободить (4). Если значение ptr изменилось, то счетчик уже освобождён другим потоком, поэтому нам нужно только освободить ссылку, которую удерживает наш поток (5).
Если поток, вызвавший push(), не сумел установить указатель data на этой итерации цикла, то он может помочь более удачливому потоку завершить обновление. Сначала мы пытаемся записать в next указатель на новый узел, выделенный в этом потоке (11). Если это получилось, то выделенный нами узел будет использоваться как новый узел tail (12), а нам следует выделить еще один узел, поскольку поместить свои данные в очередь еще только предстоит (13). После этого мы можем попытаться установить узел tail, вызвав set_new_tail до перехода к очередной итерации (14).
Вероятно, вы обратили внимание на чрезмерно большое для такого крохотного фрагмента количество new и delete. Вызвало это тем, что новые узлы создаются в push(), а уничтожаются в pop(). Поэтому быстродействие этого кода существенно зависит от того, насколько эффективно работает распределитель памяти; плохой распределитель может полностью свести на нет свойства масштабируемости, присущие свободному от блокировок контейнеру. Вопрос о выборе и реализации подобных распределителей выходит за рамки данной книги, но имейте в виду, что единственный способ узнать, какой распределитель лучше, — испытывать и замерять производительность. К числу стандартных приемов оптимизации выделения памяти можно отнести создание отдельного распределителя в каждом потоке и использование списка свободных узлов — освободившиеся узлы помещаются в этот список, а не возвращаются распределителю.
Ну и, пожалуй, хватит примеров. Давайте теперь на основе вышеизложенного сформулируем ряд рекомендаций по написанию структур данных, свободных от блокировок.