4.1.2. Потокобезопасная очередь на базе условных переменных

Приступая к проектированию обобщенной очереди, стоит потратить некоторое время на обдумывание того, какие понадобятся операции. Именно так мы подходили к разработке потокобезопасного стека в разделе 3.2.3. Возьмем в качестве образца адаптер контейнера std::queue<> из стандартной библиотеки С++, интерфейс которого показан в листинге ниже.

Листинг 4.2. Интерфейс класса std::queue

template <class T, class Container = std::deque<T>>

class queue {

public:

 explicit queue(const Container&);

 explicit queue(Container&& = Container());

 template <class Alloc> explicit queue(const Alloc&);

 template <class Alloc> queue(const Container&, const Alloc&);

 template <class Alloc> queue(Container&&, const Alloc&);

 template <class Alloc> queue(queue&&, const Alloc&);

 void swap(queue& q);

 bool empty() const;

 size_type size() const;

 T& front();

 const T& front() const;

 T& back();

 const T& back() const;

 void push(const T& x);

 void push(T&& x);

 void pop();

 template <class... Args> void emplace(Args&&... args);

};

Если не обращать внимания на конструирование, присваивание и обмен, то останется три группы операций: опрос состояния очереди в целом (empty() и size()), опрос элементов очереди (front() и back()) модификация очереди (push(), pop() и emplace()). Ситуация аналогична той, что мы видели в разделе 3.2.3 для стека, поэтому возникают те же — внутренне присущие интерфейсу — проблемы с гонкой. Следовательно, front() и pop() необходимо объединить в одной функции — точно так же, как мы постудили с top() и pop() в случае стека. Но в коде в листинге 4.1 есть дополнительный нюанс: если очередь используется для передачи данных между потоками, то поток-получатель часто будет ожидать поступления данных. Поэтому включим два варианта pop(): try_pop() пытается извлечь значение из очереди, но сразу возвращает управление (с указанием ошибки), если в очереди ничего не было, a wait_and_pop() ждет, когда появятся данные. Взяв за образец сигнатуры функций из примера стека, представим интерфейс в следующем виде:

Листинг 4.3. Интерфейс класса threadsafe_queue

#include <memory>

template<typename T>

class threadsafe_queue {

public:

 threadsafe_queue();

 threadsafe_queue(const threadsafe_queue&);

 threadsafe_queue& operator=(

  const threadsafe_queue&) = delete; ←┐ Для простоты

 void push(T new_value);              │ запрещаем присваивание

 bool try_pop(T& value);       ← (1)

 std::shared_ptr<T> try_pop(); ← (2)

 void wait_and_pop(T& value);

 std::shared_ptr<T> wait_and_pop();

 bool empty() const;

};

Как и в случае стека, мы для простоты уменьшили число конструкторов и запретили присваивание. И, как и раньше, предлагаем по два варианта функций try_pop() и wait_for_pop(). Первый перегруженный вариант try_pop() (1) сохраняет извлеченное значение в переданной по ссылке переменной, а возвращаемое значение использует для индикации ошибки: оно равно true, если значение получено, и false — в противном случае (см. раздел А.2). Во втором перегруженном варианте (2) так поступить нельзя, потому что возвращаемое значение — это данные, извлеченные из очереди. Однако же можно возвращать указатель NULL, если в очереди ничего не оказалось.

Ну и как же всё это соотносится с листингом 4.1? В следующем листинге показано, как перенести оттуда код в методы push() и wait_and_pop().

Листинг 4.4. Реализация функций push() и wait_and_pop() на основе кода из листинга 4.1

#include <queue>

#include <mutex>

#include <condition_variable>

template<typename T>

class threadsafe_queue {

private:

 std::mutex mut;

 std::queue<T> data_queue;

 std::condition_variable data_cond;

public:

 void push(T new_value) {

  std::lock_guard<std::mutex> lk(mut);

  data_queue.push(new_value);

  data_cond.notify_one();

 }

 void wait_and_pop(T& value) {

  std::unique_lock<std::mutex> lk(mut);

  data_cond.wait(lk, [this]{return !data_queue.empty();});

  value = data_queue.front();

  data_queue.pop();

 }

};

threadsafe_queue<data_chunk> data_queue; ← (1)

void data_preparation_thread() {

 while (more_data_to_prepare()) {

  data_chunk const data = prepare_data();

  data_queue.push(data); ← (2)

 }

}

void data_processing_thread() {

 while (true) {

  data_chunk data;

  data_queue.wait_and_pop(data); ← (3)

  process(data);

  if (is_last_chunk(data))

   break;

 }

}

Теперь мьютекс и условная переменная находятся в экземпляре threadsafe_queue, поэтому не нужно ни отдельных переменных (1), ни внешней синхронизации при обращении к функции push() (2). Кроме того, wait_and_pop() берет на себя заботу об ожидании условной переменной (3).

Второй перегруженный вариант wait_and_pop() тривиален, а остальные функции можно почти без изменений скопировать из кода стека в листинге 3.5. Ниже приведена окончательная реализация.

Листинг 4.5. Полное определение класса потокобезопасной очереди на базе условных переменных

#include <queue>

#include <memory>

#include <mutex>

#include <condition_variable>

template<typename T>

class threadsafe_queue {

private:

 mutable std::mutex mut;← (1) Мьютекс должен быть изменяемым

 std::queue<T> data_queue;

 std::condition_variable data_cond;

public:

 threadsafe_queue() {}

 threadsafe_queue(threadsafe_queue const& other) {

  std::lock_guard<std::mutex> lk(other.mut);

  data_queue = other.data_queue;

 }

 void push(T new_value) {

  std::lock_guard<std::mutex> lk(mut);

  data_queue.push(new_value);

  data_cond.notify_one();

 }

 void wait_and_pop(T& value) {

  std::unique_lock<std::mutex> lk(mut);

  data_cond.wait(lk, [this]{ return !data_queue.empty(); });

  value = data_queue.front();

  data_queue.pop();

 }

 std::shared_ptr<T> wait_and_pop() {

  std::unique_lock<std::mutex> lk(mut);

  data_cond.wait(lk, [this]{ return !data_queue.empty(); });

  std::shared_ptr<T>

   res(std::make_shared<T>(data_queue.front()));

  data_queue.pop();

  return res;

 }

 bool try_pop(T& value) {

  std::lock_guard<std::mutex> lk(mut);

  if (data_queue.empty())

   return false;

  value = data_queue.front();

  data_queue.pop();

  return true;

 }

 std::shared_ptr<T> try_pop() {

  std::lock_guard<std::mutex> lk(mut);

  if (data_queue.empty())

   return std::shared_ptr<T>();

  std::shared_ptr<T>

   res(std::make_shared<T>(data_queue.front()));

  data_queue.pop();

  return res;

 }

 bool empty() const {

  std::lock_guard<std::mutex> lk(mut);

  return data_queue.empty();

 }

};

Хотя empty() — константная функция-член, а параметр копирующего конструктора — const-ссылка, другие потоки могут хранить неконстантные ссылки на объект и вызывать изменяющие функции-члены, которые захватывают мьютекс. Поэтому захват мьютекса — это изменяющая операция, следовательно, член mut необходимо пометить как mutable (1), чтобы его можно было захватить в функции empty() и в копирующем конструкторе.

Условные переменные полезны и тогда, когда есть несколько потоков, ожидающих одного события. Если потоки используются для разделения работы и, следовательно, на извещение должен реагировать только один поток, то применима точно такая же структура программы, как в листинге 4.1; нужно только запустить несколько потоков обработки данных. При поступлении новых данных функция notify_one() разбудит только один поток, который проверяет условие внутри wait(), и этот единственный поток вернет управление из wait() (в ответ на помещение нового элемента в очередь data_queue). Заранее нельзя сказать, какой поток получит извещение и есть ли вообще ожидающие потоки (не исключено, что все они заняты обработкой ранее поступивших данных).

Альтернативный сценарий — когда несколько потоков ожидают одного события, и отреагировать должны все. Так бывает, например, когда инициализируются разделяемые данные, и все работающие с ними потоки должны ждать, пока инициализация завершится (хотя для этого случая существуют более подходящие механизмы, см. раздел 3.3.1 главы 3), или когда потоки должны ждать обновления разделяемых данных, например, в случае периодической повторной инициализации. В таких ситуациях поток, отвечающий за подготовку данных, может вызвать функцию-член notify_all() условной переменной вместо notify_one(). Эта функция извещает все потоки, ожидающие внутри функции wait(), о том, что они должны проверить ожидаемое условие.

Если ожидающий поток собирается ждать условия только один раз, то есть после того как оно станет истинным, он не вернется к ожиданию той же условной переменной, то лучше применить другой механизм синхронизации. В особенности это относится к случаю, когда ожидаемое условие — доступность каких-то данных. Для такого сценария больше подходят так называемые будущие результаты (future).