6.2.2. Потокобезопасная очередь с блокировками и условными переменными

В листинге 6.2 воспроизведен код потокобезопасной очереди из главы 4. Если стек построен по образцу std::stack<>, то очередь — по образцу std::queue<>. Но ее интерфейс также отличается от стандартного адаптера контейнера, потому что запись в структуру данных должна быть безопасной относительно одновременного доступа из нескольких потоков.

Листинг 6.2. Потокобезопасная очередь с блокировками и условными переменными

template<typename T>

class threadsafe_queue {

private:

 mutable std::mutex mut;

 std::queue<T> data_queue;

 std::condition_variable data_cond;

public:

 threadsafe_queue() {}

 void push(T new_value) {

  std::lock_guard<std::mutex> lk(mut);

  data_queue.push(std::move(data));

  data_cond.notify_one(); ← (1)

 }

 void wait_and_pop(T& value) { ← (2)

  std::unique_lock<std::mutex> lk(mut);

  data_cond.wait(lk, [this]{return !data_queue.empty();});

  value = std::move(data_queue.front());

  data_queue.pop();

 }

 std::shared_ptr<T> wait_and_pop() ← (3)

  std::unique_lock<std::mutex> lk(mut);

  data_cond.wait(lk, [this] {return !data_queue.empty();});← (4)

  std::shared_ptr<T> res(

   std::make_shared<T>(std::move(data_queue.front())));

  data_queue.pop();

  return res;

 }

 bool try_pop(T& value) {

  std::lock_guard<std::mutex> lk(mut);

  if (data_queue.empty())

   return false;

  value = std::move(data_queue.front());

  data_queue.pop();

  return true;

 }

 std::shared_ptr<T> try_pop() {

  std::lock_guard<std::mutex> lk(mut);

  if (data_queue.empty())

   return std::shared_ptr<T>(); ← (5)

  std::shared_ptr<T> res(

   std::make_shared<T>(std::move(data_queue.front())));

  data_queue.pop();

  return res;

 }

 bool empty() const {

  std::lock_guard<std::mutex> lk(mut);

  return data_queue.empty();

 }

};

Структурно очередь в листинге 6.2 реализована аналогично стеку в листинге 6.1, отличие только в обращениях к функции data_cond.notify_one() в push() (1) и в наличии двух вариантов функции wait_and_pop() (2), (3). Оба перегруженных варианта try_pop() почти идентичны функциям pop() в листинге 6.1 с тем отличием, что не возбуждают исключение, если очередь пуста. Вместо этого одна функция возвращает булевское значение, показывающее, были ли извлечены данные, а вторая — возвращающая указатель на данные (5) — указатель NULL. Точно так же можно было бы поступить и в случае стека. Таким образом, если оставить в стороне функции wait_and_pop(), то применим тот же анализ, который мы провели для стека.

Новые функции wait_and_pop() решают проблему ожидания значения в очереди, с которой мы столкнулись при обсуждении стека; вместо того чтобы раз за разом вызывать empty(), ожидающий поток может просто вызвать wait_and_pop(), а структура данных обслужит этот вызов с помощью условной переменной. Обращение к data_cond.wait() не вернет управление, пока во внутренней очереди не появится хотя бы один элемент, так что мы можем не беспокоиться но поводу того, что в этом месте кода возможна пустая очередь. При этом данные по-прежнему защищаются мьютексом. Таким образом, функции wait_and_pop() не вводят новых состояний гонки, не создают возможности взаимоблокировок и не нарушают никаких инвариантов.

В части безопасности относительно исключений есть мелкая неприятность — если помещения данных в очередь ожидают несколько потоков, то лишь один из них будет разбужен в результате вызова data_cond.notify_one(). Однако если этот поток возбудит исключение в wait_and_pop(), например при конструировании std::shared_ptr<> (4), то ни один из оставшихся потоков разбужен не будет. Если это неприемлемо, то можно заменить notify_one() на data_cond.notify_all(), тогда будут разбужены все потоки, но за это придётся заплатить — большая часть из них сразу же уснет снова, увидев, что очередь по-прежнему пуста. Другой вариант — включить в wait_and_pop() обращение к notify_one() в случае исключения, тогда другой поток сможет попытаться извлечь находящееся в очереди значение. Третий вариант — перенести инициализацию std::shared_ptr<> в push() и сохранять экземпляры std::shared_ptr<>, а не сами значения данных. Тогда при копировании std::shared_ptr<> из внутренней очереди std::queue<> никаких исключений возникнуть не может, и wait_and_pop() становится безопасной. В следующем листинге приведена реализация очереди, переработанная с учетом высказанных соображений.

Листинг 6.3. Потокобезопасная очередь, в которой хранятся объекты std::shared_ptr

template<typename T>

class threadsafe_queue {

private:

 mutable std::mutex mut;

 std::queue<std::shared_ptr<T> > data_queue;

 std::condition_variable data_cond;

public:

 threadsafe_queue() {}

 void wait_and_pop(T& value) {

  std::unique_lock<std::mutex> lk(mut);

  data_cond.wait(lk, [this]{return !data_queue.empty();});

  value = std::move(*data_queue.front()); ← (1)

  data_queue.pop();

 }

 bool try_pop(T& value) {

  std::lock_guard<std::mutex> lk(mut);

  if (data_queue.empty())

   return false;

  value = std::move(*data_queue.front()); ← (2)

  data_queue.pop();

  return true;

 }

 std::shared_ptr<T> wait_and_pop() {

  std::unique_lock<std::mutex> lk(mut);

  data_cond.wait(lk, [this]{return !data_queue.empty();});

  std::shared_ptr<T> res = data_queue.front(); ← (3)

  data_queue.pop();

  return res;

 }

 std::shared_ptr<T> try_pop() {

  std::lock_guard<std::mutex> lk(mut);

  if (data_queue.empty())

   return std::shared_ptr<T>();

  std::shared_ptr<T> res = data_queue.front(); ← (4)

  data_queue.pop();

  return res;

 }

 void push(T new_value) {

  std::shared_ptr<T> data(

   std::make_shared<T>(std::move(new_value))); ← (5)

  std::lock_guard<std::mutex> lk(mut);

  data_queue.push(data);

  data_cond.notify_one();

 }

 bool empty() const {

  std::lock_guard<std::mutex> lk(mut);

  return data_queue.empty();

 }

};

Последствия хранения данных, обернутых в std::shared_ptr<>, понятны: функции pop, которые получают значение из очереди в виде ссылки на переменную, теперь должны разыменовывать указатель (1), (2), а функции pop, которые возвращают std::shared_ptr<>, теперь могут напрямую извлекать его из очереди (3), (4) без дальнейших манипуляций.

У хранения данных в виде std::shared_ptr<> есть и еще одно преимущество: выделение памяти для нового объекта можно производить не под защитой блокировки в push() (5), тогда как в листинге 6.2 это приходилось делать в защищенном участке кода внутри pop(). Поскольку выделение памяти, вообще говоря, дорогая операция, это изменение весьма благотворно скажется на общей производительности очереди, так как уменьшается время удержания мьютекса, а, значит, у остальных потоков остается больше времени на полезную работу.

Как и в примере стека, применение мьютекса для защиты всей структуры данных ограничивает возможности распараллеливания работы с очередью; хотя ожидать доступа к очереди могут несколько потоков, выполняющих разные функции, в каждый момент лишь один совершает какие-то действия. Однако это ограничение отчасти проистекает из того, что мы пользуемся классом std::queue<>, — стандартный контейнер составляет единый элемент данных, который либо защищен, либо нет. Полностью взяв на себя управление деталями реализации структуры данных, мы сможем обеспечить мелкогранулярные блокировки и повысить уровень параллелизма.