12.3. Уведомление одного потока другим

We use cookies. Read the Privacy and Cookie Policy

12.3. Уведомление одного потока другим

Проблема

Используется шаблон, в котором один поток (или группа потоков) выполняет какие-то действия, и требуется сделать так, чтобы об этом узнал другой поток (или группа потоков). Может использоваться главный поток, который передает работу подчиненным потокам, или может использоваться одна группа потоков для пополнения очереди и другая для удаления данных из очереди и выполнения чего-либо полезного.

Решение

Используйте объекты mutex и condition, которые объявлены в boost/thread/mutex.hpp и boost/thread/condition.hpp. Можно создать условие (condition) для каждой ожидаемой потоками ситуации и при возникновении такой ситуации уведомлять все ее ожидающие потоки. Пример 12.4 показывает, как можно обеспечить передачу уведомлений в модели потоков «главный/подчиненные».

Пример 12.4. Передача уведомлений между потоками

#include <iostream>

#include <boost/thread/thread.hpp>

#include <boost/thread/condition.hpp>

#include <boost/thread/mutex.hpp>

#include <list>

#include <string>

class Request { /*...*/ };

// Простой класс очереди заданий; в реальной программе вместо этого класса

// используйте std::queue

template<typename T>

class JobQueue {

public:

 JobQueue() {}

 ~JobQueue() {}

 void submitJob(const T& x) {

  boost::mutex::scoped_lock lock(mutex_);

  list_.push_back(x);

  workToBeDone_.notify_one();

 }

 T getJob() {

  boost::mutex::scoped_lock lock(mutex_);

  workToBeDone_.wait(lock); // Ждать удовлетворения этого условия, затем

                            // блокировать мьютекс

  T tmp = list_.front();

  list_.pop_front();

  return(tmp);

 }

private:

 std::list<T> list_;

 boost::mutex mutex_;

 boost::condition workToBeDone_;

};

JobQueue<Request> myJobQueue;

void boss() {

 for (;;) {

  // Получить откуда-то запрос

  Request req;

  myJobQueue.submitJob(req);

 }

}

void worker() {

 for (;;) {

  Request r(myJobQueue.getJob());

  // Выполнить какие-то действия с заданием...

 }

}

int main() {

 boost::thread thr1(boss);

 boost::thread thr2(worker);

 boost::thread thr3(worker);

 thr1.join();

 thr2.join();

 thr3.join();

}

Обсуждение

Объект условия использует мьютекс mutex и позволяет дождаться ситуации, когда он становится заблокированным. Рассмотрим пример 12.4, в котором представлена модифицированная версии класса Queue из примера 12.2. Я модифицировал очередь Queue, получая более специализированную очередь, а именно JobQueue, объекты которой являются заданиями, поступающими в очередь со стороны главного потока и обрабатываемыми подчиненными потоками.

Самое важное изменение класса JobQueue связано переменной-членом workToBeDone_ типа condition. Эта переменная показывает, имеется или нет задание в очереди. Когда потоку требуется получить элемент из очереди, он вызывает функцию getJob, которая пытается захватить мьютекс и затем дожидаться возникновения новой ситуации, что реализуют следующие строки.

boost::mutex::scoped_lock lock(mutex_);

workToBeDone_.wait(lock);

Первая строка блокирует мьютекс обычным образом. Вторая строка разблокирует мьютекс и переводит его в состояние ожидания или в неактивное состояние до тех пор, пока не будет удовлетворено условие. Разблокирование мьютекса позволяет другим потокам использовать этот мьютекс; один из них должен установить ожидаемое условие, в противном случае другие потоки не смогут блокировать мьютекс, пока один поток ожидает возникновения необходимого условия.

В функции submitJob после помещения задания во внутренний список я добавил следующую строку.

workToBeDone_.notify_one();

В результате «удовлетворяется» условие, в ожидании которого находится getJob. Формально это означает, что если существуют какие-нибудь потоки, вызвавшие функцию wait для этого условия, то один из них перейдет в состояние выполнения. Для функции getJob это означает продолжение работы, приостановленной в следующей строке:

workToBeDone_.wait(lock);

Но это еще не все. Функция wait делает две вещи: она дожидается вызова в каком-нибудь потоке функции notify_one или notify_all для данного условия, затем она пытается блокировать соответствующий мьютекс. Поэтому, когда submitJob вызывает notify_all, фактически происходит следующее: ожидающий поток переходит в состояние выполнения и на следующем шаге пытается блокировать мьютекс, который все еще блокирует функция submitJob, поэтому он вновь переходит в состояние ожидания, пока не завершит работу функция submitJob. Таким образом, condition::wait требует, чтобы мьютекс был блокирован при его вызове, когда он оказывается разблокированным и затем вновь заблокированным при удовлетворении условия.

Для уведомления всех потоков, ожидающих удовлетворения некоторого условия, следует вызывать функцию notify_all. Она работает так же, как notify_one, за исключением того, что в состояние выполнения переходят все потоки, ожидающие это условие. Однако теперь все они будут пытаться выполнить блокировку, поэтому характер последующих действий зависит от типа мьютекса и типа используемой блокировки.

Применение условия позволяет управлять ситуацией более тонко, чем при использовании одних только мьютексов и блокировок. Рассмотрим представленный ранее класс Queue. Потоки, ожидающие получение элемента из очереди, находятся в состоянии ожидания до тех пор, пока они не смогут установить блокировку для записи и затем извлечь элемент из очереди. Может показаться, что это будет хорошо работать без применения какого-либо механизма сигнализации, но так ли на самом деле? А что произойдет, когда очередь окажется пустой? У вас нет большого выбора при реализации функции dequeue, если вы ждете удовлетворения некоторого условия: проверка наличия элементов в очереди и, если они отсутствуют, возврат управления; использование другого мьютекса, который блокируется при пустой очереди и разблокируется, когда очередь содержит данные (не подходящее решение) или возврат специального значения, когда очередь оказывается пустой. Все это проблематично или неэффективно. Если вы просто возвращаете управление, когда очередь пустая, выбрасывая исключение или возвращая специальное значение, то вашим клиентам придется постоянно проверять поступающие значения. Это означает бесполезную трату времени.

Объект condition позволяет пользовательским потокам находиться в неактивном состоянии, поэтому процессор может выполнять что-то другое, когда условие не удовлетворяется. Представим веб-сервер, использующий пул рабочих потоков, обрабатывающих поступающие запросы. Значительно лучше иметь дочерние потоки, находящиеся в состоянии ожидания, когда нет никакой активности, чем заставлять их выполнять бесконечный цикл или «засыпать» и «просыпаться» периодически для проверки очереди.