6.2.2. Потокобезопасная очередь с блокировками и условными переменными
В листинге 6.2 воспроизведен код потокобезопасной очереди из главы 4. Если стек построен по образцу std::stack<>, то очередь — по образцу std::queue<>. Но ее интерфейс также отличается от стандартного адаптера контейнера, потому что запись в структуру данных должна быть безопасной относительно одновременного доступа из нескольких потоков.
Листинг 6.2. Потокобезопасная очередь с блокировками и условными переменными
template<typename T>
class threadsafe_queue {
private:
mutable std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue() {}
void push(T new_value) {
std::lock_guard<std::mutex> lk(mut);
data_queue.push(std::move(data));
data_cond.notify_one(); ← (1)
}
void wait_and_pop(T& value) { ← (2)
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{return !data_queue.empty();});
value = std::move(data_queue.front());
data_queue.pop();
}
std::shared_ptr<T> wait_and_pop() ← (3)
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this] {return !data_queue.empty();});← (4)
std::shared_ptr<T> res(
std::make_shared<T>(std::move(data_queue.front())));
data_queue.pop();
return res;
}
bool try_pop(T& value) {
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return false;
value = std::move(data_queue.front());
data_queue.pop();
return true;
}
std::shared_ptr<T> try_pop() {
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return std::shared_ptr<T>(); ← (5)
std::shared_ptr<T> res(
std::make_shared<T>(std::move(data_queue.front())));
data_queue.pop();
return res;
}
bool empty() const {
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};
Структурно очередь в листинге 6.2 реализована аналогично стеку в листинге 6.1, отличие только в обращениях к функции data_cond.notify_one() в push() (1) и в наличии двух вариантов функции wait_and_pop() (2), (3). Оба перегруженных варианта try_pop() почти идентичны функциям pop() в листинге 6.1 с тем отличием, что не возбуждают исключение, если очередь пуста. Вместо этого одна функция возвращает булевское значение, показывающее, были ли извлечены данные, а вторая — возвращающая указатель на данные (5) — указатель NULL. Точно так же можно было бы поступить и в случае стека. Таким образом, если оставить в стороне функции wait_and_pop(), то применим тот же анализ, который мы провели для стека.
Новые функции wait_and_pop() решают проблему ожидания значения в очереди, с которой мы столкнулись при обсуждении стека; вместо того чтобы раз за разом вызывать empty(), ожидающий поток может просто вызвать wait_and_pop(), а структура данных обслужит этот вызов с помощью условной переменной. Обращение к data_cond.wait() не вернет управление, пока во внутренней очереди не появится хотя бы один элемент, так что мы можем не беспокоиться но поводу того, что в этом месте кода возможна пустая очередь. При этом данные по-прежнему защищаются мьютексом. Таким образом, функции wait_and_pop() не вводят новых состояний гонки, не создают возможности взаимоблокировок и не нарушают никаких инвариантов.
В части безопасности относительно исключений есть мелкая неприятность — если помещения данных в очередь ожидают несколько потоков, то лишь один из них будет разбужен в результате вызова data_cond.notify_one(). Однако если этот поток возбудит исключение в wait_and_pop(), например при конструировании std::shared_ptr<> (4), то ни один из оставшихся потоков разбужен не будет. Если это неприемлемо, то можно заменить notify_one() на data_cond.notify_all(), тогда будут разбужены все потоки, но за это придётся заплатить — большая часть из них сразу же уснет снова, увидев, что очередь по-прежнему пуста. Другой вариант — включить в wait_and_pop() обращение к notify_one() в случае исключения, тогда другой поток сможет попытаться извлечь находящееся в очереди значение. Третий вариант — перенести инициализацию std::shared_ptr<> в push() и сохранять экземпляры std::shared_ptr<>, а не сами значения данных. Тогда при копировании std::shared_ptr<> из внутренней очереди std::queue<> никаких исключений возникнуть не может, и wait_and_pop() становится безопасной. В следующем листинге приведена реализация очереди, переработанная с учетом высказанных соображений.
Листинг 6.3. Потокобезопасная очередь, в которой хранятся объекты std::shared_ptr
template<typename T>
class threadsafe_queue {
private:
mutable std::mutex mut;
std::queue<std::shared_ptr<T> > data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue() {}
void wait_and_pop(T& value) {
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{return !data_queue.empty();});
value = std::move(*data_queue.front()); ← (1)
data_queue.pop();
}
bool try_pop(T& value) {
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return false;
value = std::move(*data_queue.front()); ← (2)
data_queue.pop();
return true;
}
std::shared_ptr<T> wait_and_pop() {
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{return !data_queue.empty();});
std::shared_ptr<T> res = data_queue.front(); ← (3)
data_queue.pop();
return res;
}
std::shared_ptr<T> try_pop() {
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res = data_queue.front(); ← (4)
data_queue.pop();
return res;
}
void push(T new_value) {
std::shared_ptr<T> data(
std::make_shared<T>(std::move(new_value))); ← (5)
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data);
data_cond.notify_one();
}
bool empty() const {
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};
Последствия хранения данных, обернутых в std::shared_ptr<>, понятны: функции pop, которые получают значение из очереди в виде ссылки на переменную, теперь должны разыменовывать указатель (1), (2), а функции pop, которые возвращают std::shared_ptr<>, теперь могут напрямую извлекать его из очереди (3), (4) без дальнейших манипуляций.
У хранения данных в виде std::shared_ptr<> есть и еще одно преимущество: выделение памяти для нового объекта можно производить не под защитой блокировки в push() (5), тогда как в листинге 6.2 это приходилось делать в защищенном участке кода внутри pop(). Поскольку выделение памяти, вообще говоря, дорогая операция, это изменение весьма благотворно скажется на общей производительности очереди, так как уменьшается время удержания мьютекса, а, значит, у остальных потоков остается больше времени на полезную работу.
Как и в примере стека, применение мьютекса для защиты всей структуры данных ограничивает возможности распараллеливания работы с очередью; хотя ожидать доступа к очереди могут несколько потоков, выполняющих разные функции, в каждый момент лишь один совершает какие-то действия. Однако это ограничение отчасти проистекает из того, что мы пользуемся классом std::queue<>, — стандартный контейнер составляет единый элемент данных, который либо защищен, либо нет. Полностью взяв на себя управление деталями реализации структуры данных, мы сможем обеспечить мелкогранулярные блокировки и повысить уровень параллелизма.