4.1.2. Потокобезопасная очередь на базе условных переменных
Приступая к проектированию обобщенной очереди, стоит потратить некоторое время на обдумывание того, какие понадобятся операции. Именно так мы подходили к разработке потокобезопасного стека в разделе 3.2.3. Возьмем в качестве образца адаптер контейнера std::queue<> из стандартной библиотеки С++, интерфейс которого показан в листинге ниже.
Листинг 4.2. Интерфейс класса std::queue
template <class T, class Container = std::deque<T>>
class queue {
public:
explicit queue(const Container&);
explicit queue(Container&& = Container());
template <class Alloc> explicit queue(const Alloc&);
template <class Alloc> queue(const Container&, const Alloc&);
template <class Alloc> queue(Container&&, const Alloc&);
template <class Alloc> queue(queue&&, const Alloc&);
void swap(queue& q);
bool empty() const;
size_type size() const;
T& front();
const T& front() const;
T& back();
const T& back() const;
void push(const T& x);
void push(T&& x);
void pop();
template <class... Args> void emplace(Args&&... args);
};
Если не обращать внимания на конструирование, присваивание и обмен, то останется три группы операций: опрос состояния очереди в целом (empty() и size()), опрос элементов очереди (front() и back()) модификация очереди (push(), pop() и emplace()). Ситуация аналогична той, что мы видели в разделе 3.2.3 для стека, поэтому возникают те же — внутренне присущие интерфейсу — проблемы с гонкой. Следовательно, front() и pop() необходимо объединить в одной функции — точно так же, как мы постудили с top() и pop() в случае стека. Но в коде в листинге 4.1 есть дополнительный нюанс: если очередь используется для передачи данных между потоками, то поток-получатель часто будет ожидать поступления данных. Поэтому включим два варианта pop(): try_pop() пытается извлечь значение из очереди, но сразу возвращает управление (с указанием ошибки), если в очереди ничего не было, a wait_and_pop() ждет, когда появятся данные. Взяв за образец сигнатуры функций из примера стека, представим интерфейс в следующем виде:
Листинг 4.3. Интерфейс класса threadsafe_queue
#include <memory>
template<typename T>
class threadsafe_queue {
public:
threadsafe_queue();
threadsafe_queue(const threadsafe_queue&);
threadsafe_queue& operator=(
const threadsafe_queue&) = delete; ←┐ Для простоты
void push(T new_value); │ запрещаем присваивание
bool try_pop(T& value); ← (1)
std::shared_ptr<T> try_pop(); ← (2)
void wait_and_pop(T& value);
std::shared_ptr<T> wait_and_pop();
bool empty() const;
};
Как и в случае стека, мы для простоты уменьшили число конструкторов и запретили присваивание. И, как и раньше, предлагаем по два варианта функций try_pop() и wait_for_pop(). Первый перегруженный вариант try_pop() (1) сохраняет извлеченное значение в переданной по ссылке переменной, а возвращаемое значение использует для индикации ошибки: оно равно true, если значение получено, и false — в противном случае (см. раздел А.2). Во втором перегруженном варианте (2) так поступить нельзя, потому что возвращаемое значение — это данные, извлеченные из очереди. Однако же можно возвращать указатель NULL, если в очереди ничего не оказалось.
Ну и как же всё это соотносится с листингом 4.1? В следующем листинге показано, как перенести оттуда код в методы push() и wait_and_pop().
Листинг 4.4. Реализация функций push() и wait_and_pop() на основе кода из листинга 4.1
#include <queue>
#include <mutex>
#include <condition_variable>
template<typename T>
class threadsafe_queue {
private:
std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
void push(T new_value) {
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}
void wait_and_pop(T& value) {
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{return !data_queue.empty();});
value = data_queue.front();
data_queue.pop();
}
};
threadsafe_queue<data_chunk> data_queue; ← (1)
void data_preparation_thread() {
while (more_data_to_prepare()) {
data_chunk const data = prepare_data();
data_queue.push(data); ← (2)
}
}
void data_processing_thread() {
while (true) {
data_chunk data;
data_queue.wait_and_pop(data); ← (3)
process(data);
if (is_last_chunk(data))
break;
}
}
Теперь мьютекс и условная переменная находятся в экземпляре threadsafe_queue, поэтому не нужно ни отдельных переменных (1), ни внешней синхронизации при обращении к функции push() (2). Кроме того, wait_and_pop() берет на себя заботу об ожидании условной переменной (3).
Второй перегруженный вариант wait_and_pop() тривиален, а остальные функции можно почти без изменений скопировать из кода стека в листинге 3.5. Ниже приведена окончательная реализация.
Листинг 4.5. Полное определение класса потокобезопасной очереди на базе условных переменных
#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>
template<typename T>
class threadsafe_queue {
private:
mutable std::mutex mut;← (1) Мьютекс должен быть изменяемым
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue() {}
threadsafe_queue(threadsafe_queue const& other) {
std::lock_guard<std::mutex> lk(other.mut);
data_queue = other.data_queue;
}
void push(T new_value) {
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}
void wait_and_pop(T& value) {
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{ return !data_queue.empty(); });
value = data_queue.front();
data_queue.pop();
}
std::shared_ptr<T> wait_and_pop() {
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{ return !data_queue.empty(); });
std::shared_ptr<T>
res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
bool try_pop(T& value) {
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return false;
value = data_queue.front();
data_queue.pop();
return true;
}
std::shared_ptr<T> try_pop() {
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T>
res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
bool empty() const {
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};
Хотя empty() — константная функция-член, а параметр копирующего конструктора — const-ссылка, другие потоки могут хранить неконстантные ссылки на объект и вызывать изменяющие функции-члены, которые захватывают мьютекс. Поэтому захват мьютекса — это изменяющая операция, следовательно, член mut необходимо пометить как mutable (1), чтобы его можно было захватить в функции empty() и в копирующем конструкторе.
Условные переменные полезны и тогда, когда есть несколько потоков, ожидающих одного события. Если потоки используются для разделения работы и, следовательно, на извещение должен реагировать только один поток, то применима точно такая же структура программы, как в листинге 4.1; нужно только запустить несколько потоков обработки данных. При поступлении новых данных функция notify_one() разбудит только один поток, который проверяет условие внутри wait(), и этот единственный поток вернет управление из wait() (в ответ на помещение нового элемента в очередь data_queue). Заранее нельзя сказать, какой поток получит извещение и есть ли вообще ожидающие потоки (не исключено, что все они заняты обработкой ранее поступивших данных).
Альтернативный сценарий — когда несколько потоков ожидают одного события, и отреагировать должны все. Так бывает, например, когда инициализируются разделяемые данные, и все работающие с ними потоки должны ждать, пока инициализация завершится (хотя для этого случая существуют более подходящие механизмы, см. раздел 3.3.1 главы 3), или когда потоки должны ждать обновления разделяемых данных, например, в случае периодической повторной инициализации. В таких ситуациях поток, отвечающий за подготовку данных, может вызвать функцию-член notify_all() условной переменной вместо notify_one(). Эта функция извещает все потоки, ожидающие внутри функции wait(), о том, что они должны проверить ожидаемое условие.
Если ожидающий поток собирается ждать условия только один раз, то есть после того как оно станет истинным, он не вернется к ожиданию той же условной переменной, то лучше применить другой механизм синхронизации. В особенности это относится к случаю, когда ожидаемое условие — доступность каких-то данных. Для такого сценария больше подходят так называемые будущие результаты (future).