12.2. Обеспечение потокозащищенности ресурсов

We use cookies. Read the Privacy and Cookie Policy

12.2. Обеспечение потокозащищенности ресурсов

Проблема

В программе используется несколько потоков и требуется гарантировать невозможность модификации ресурса несколькими потоками одновременно. В целом это называется обеспечением потокозащищенности (thread-safe) ресурсов или сериализацией доступа к ним.

Решение

Используйте класс mutex, определенный в boost/thread/mutex.hpp, для синхронизации доступа к потокам. Пример 12.2 показывает, как можно использовать в простых случаях объект mutex для управления параллельным доступом к очереди.

Пример 12.2. Создание потокозащищенного класса

#include <iostream>

#include <boost/thread/thread.hpp>

#include <string>

// Простой класс очереди; в реальной программе вместо него следует

// использовать std::queue

template<typename T>

class Queue {

public:

 Queue() {}

 ~Queue() {}

 void enqueue(const T& x) {

  // Блокировать мьютекс для этой очереди

  boost::mutex::scoped_lock lock(mutex_);

  list_.push_back(x);

  // scoped_lock автоматически уничтожается (и, следовательно, мьютекс

  // разблокируется) при выходе из области видимости

 }

 T dequeue() {

  boost::mutex::scoped_lock lock(mutex_);

  if (list_.empty())

    throw "empty!";      // Это приводит к выходу из текущей области

  T tmp = list_.front(); // видимости, поэтому блокировка освобождается

  list_.pop_front();

  return(tmp);

 } // Снова при выходе из области видимости мьютекс разблокируется

private:

 std::list<T> list_;

 boost::mutex mutex_;

};

Queue<std::string> queueOfStrings;

void sendSomething() {

 std::string s;

 for (int i = 0; i < 10; ++i) {

  queueOfStrings.enqueue("Cyrus");

 }

}

void recvSomething() {

 std::string s;

 for(int i = 0; i < 10; ++i) {

  try {

   s = queueOfStrings.dequeue();

  } catch(...) {}

 }

}

int main() {

 boost::thread thr1(sendSomething);

 boost::thread thr2(recvSomething);

 thr1.join();

 thr2.join();

}

Обсуждение

Обеспечение потокозащищенности классов, функций, блоков программного кода и других объектов является сущностью многопоточного программирования. Если вы проектируете какой-нибудь компонент программного обеспечения с возможностями многопоточной обработки, то можете постараться обеспечить каждый поток своим набором ресурсов, например объектами в стеке и динамической памяти, ресурсами операционной системы и т.д. Однако рано или поздно вам придется обеспечить совместное использование различными потоками каких-либо ресурсов. Это может быть совместная очередь поступающих запросов (как это происходит на многопоточном веб-сервере) или нечто достаточно простое, как поток вывода (например, в файл журнала или даже в cout). Стандартный способ координации безопасного совместного использования ресурсов подразумевает применение мьютекса (mutex), который обеспечивает монопольный доступ к чему-либо.

Остальная часть обсуждения в целом посвящена мьютексам, и в частности методам использования boost::mutex для сериализации доступа к ресурсам. Я использую терминологию подхода «концепция/модель», о котором я говорил кратко во введении настоящей главы. Концепция — это абстрактное (независимое от языка) описание чего-либо, а модель концепции — конкретное ее представление в форме класса С++. Уточнение концепции — это определенная концепция с некоторыми дополнительными возможностями.

Все-таки параллельное программирование представляет собой сложную тему, и в одном рецепте нельзя отразить все применяемые в этой технологии методы. Можно использовать много шаблонов проектирования и разных стратегий, подходящих для различных приложений. Если при проектировании программного обеспечения вы предполагаете, что многопоточная обработка составит значительный объем, или проектируете высокопроизводительные приложения, необходимо прочитать хорошую книгу по шаблонам многопоточной обработки. Многие проблемы, связанные с трудностями отладки многопоточных программ, могут быть успешно преодолены за счет тщательного и продолжительного проектирования.

Использование мьютексов

Концепция мьютекса проста: мьютекс это некий объект, представляющий ресурс; только один поток может его блокировать или разблокировать в данный момент времени. Он является флагом, который используется для координации доступа к ресурсу со стороны нескольких пользователей. В библиотеке Boost Threads моделью концепции мьютекса является класс boost::mutex. В примере 1 2.2 доступ для записи в классе Queue обеспечивается переменной-членом mutex.

boost::mutex mutex_;

mutex_ должен блокироваться какой-нибудь функцией-членом, которая должна изменять состояние очереди обслуживаемых элементов. Сам объект mutex_ ничего не знает о том, что он представляет. Это просто флаг блокировки/разблокировки, используемый всеми пользователями некоторого ресурса.

В примере 12.2, когда какая-нибудь функция-член класса Queue собирается изменить состояние объекта, она сначала должна заблокировать mutex_. Только один поток в конкретный момент времени может его заблокировать, что не позволяет нескольким объектам одновременно модифицировать состояние объекта Queue. Таким образом, мьютекс mutex представляет собой простой сигнальный механизм, но это нечто большее, чем просто bool или int, потому что для mutex необходим сериализованный доступ, который может быть обеспечен только ядром операционной системы. Если вы попытаетесь сделать то же самое с bool, это не сработает, потому что ничто не препятствует одновременной модификации состояния bool несколькими потоками. (В разных операционных системах это осуществляется по-разному, и именно поэтому не просто реализовать переносимую библиотеку потоков.)

Объекты mutex блокируются и разблокируются, используя несколько различных стратегий блокировки, самой простой из которых является блокировка scoped_lock. scoped_lock — это класс, при конструировании объекта которого используется аргумент типа mutex, блокируемый до тех пор, пока не будет уничтожена блокировка lock. Рассмотрим функцию-член enqueue в примере 12.2, которая показывает, как scoped_lock работает совместно с мьютексом mutex_.

void enqueue(const T& x) {

 boost::mutex::scoped_lock lock(mutex_);

 list_.push_back(x);

} // разблокировано!

Когда lock уничтожается, mutex_ разблокируется. Если lock конструируется для объекта mutex, который уже заблокирован другим потоком, текущий поток переходит в состояние ожидания до тех пор, пока lock не окажется доступен.

Такой подход поначалу может показаться немного странным: а почему бы мьютексу mutex не иметь методы lock и unlock? Применение класса scoped_lock, который обеспечивает блокировку при конструировании и разблокировку при уничтожении, на самом деле более удобно и менее подвержено ошибкам. Когда вы создаете блокировку, используя scoped_lock, мьютекс блокируется на весь период существования объекта scoped_lock, т.е. вам не надо ничего разблокировать в явной форме на каждой ветви вычислений. С другой стороны, если вам приходится явно разблокировать захваченный мьютекс, необходимо гарантировать перехват любых исключений, которые могут быть выброшены в вашей функции (или где-нибудь выше ее в стеке вызовов), и гарантировать разблокировку mutex. При использовании scoped_lock, если выбрасывается исключение или функция возвращает управление, объект scoped_lock автоматически уничтожается и mutex разблокируется.

Использование мьютекса позволяет сделать всю работу, однако хочется немного большего. При таком подходе нет различия между чтением и записью, что существенно, так как неэффективно заставлять потоки ждать в очереди доступа к ресурсу, когда многие из них выполняют только операции чтения, для которых не требуется монопольный доступ. Для этого в библиотеке Boost Threads предусмотрен класс read_write_mutex. Пример 12.3 показывает, как можно реализовать пример 12.2, используя read_write_mutex с функцией-членом front, которая позволяет вызывающей программе получить копию первого элемента очереди без его выталкивания.

Пример 12.3. Использование мьютекса чтения/записи

#include <iostream>

#include <boost/thread/thread.hpp>

#include <boost/thread/read_write_mutex.hpp>

#include <string>

template<typename T>

class Queue {

 public:

 Queue() : // Использовать мьютекс чтения/записи и придать ему приоритет

           // записи

 rwMutex_(boost::read_write_scheduling_policy::writer_priority) {}

 ~Queue() {}

 void enqueue(const T& x) {

  // Использовать блокировку чтения/записи, поскольку enqueue

  // обновляет состояние

  boost::read_write_mutex::scoped_write_lock writeLock(rwMutex_);

  list_.push_back(x);

 }

 T dequeue() {

  // Снова использовать блокировку для записи

  boost::read_write_mutex::scoped_write_lock writeLock(rwMutex_);

  if (list_.empty())

   throw "empty!";

  T tmp = list_.front();

  list_.pop_front();

  return(tmp);

 }

 T getFront() {

  // Это операция чтения, поэтому требуется блокировка только для чтения

  boost::read_write_mutex::scoped_read_lock.readLock(rwMutex_);

  if (list_.empty())

   throw "empty!";

  return(list_.front());

 }

private:

 std::list<T> list_;

 boost::read_write_mutex rwMutex_;

};

Queue<std::string> queueOfStrings;

void sendSomething() {

 std::string s;

 for (int i = 0, i < 10; ++i) {

  queueOfStrings.enqueue("Cyrus");

 }

}

void checkTheFront() {

 std::string s;

 for (int i=0; i < 10; ++i) {

  try {

   s = queueOfStrings.getFront();

  } catch(...) {}

 }

}

int main() {

 boost::thread thr1(sendSomething);

 boost::thread_group grp;

 grp.сreate_thread(checkTheFront);

 grp.create_thread(checkTheFront);

 grp.сreate_thread(checkTheFront);

 grp_create_thread(checkTheFront);

 thr1.join();

 grp.join_all();

}

Здесь необходимо отметить несколько моментов. Обратите внимание, что теперь я использую read_write_mutex.

boost::read_write_mutex rwMutex_;

При использовании мьютексов чтения/записи блокировки тоже выполняются иначе. В примере 12.3, когда мне нужно заблокировать Queue для записи, я создаю объект класса scoped_write_lock.

boost::read_write_mutex::scoped_write_lock writeLock(rwMutex_);

А когда мне просто требуется прочитать Queue, я использую scoped_read_lock.

boost::read_write_mutex::scoped_read_lock readLock(rwMutex_);

Блокировки чтения/записи удобны, но они не предохраняют вас от серьезных ошибок. На этапе компиляции не делается проверка ресурса, представленного мьютексом rwMutex_, гарантирующая отсутствие изменения ресурса при блокировке только для чтения. Вы сами должны позаботиться о том, чтобы поток мог модифицировать состояние объекта только при блокировке для записи, поскольку компилятор это не будет делать.

Точная последовательность выполнения блокировок определяется политикой их планирования; эту политику вы задаете при конструировании объекта mutex. В библиотеке Boost Threads предусматривается четыре политики.

reader_priority

Потоки, ожидающие выполнения блокировки для чтения, ее получат раньше потоков, ожидающих выполнения блокировки для записи.

writer_priority

Потоки, ожидающие выполнения блокировки для записи, ее получат раньше потоков, ожидающих выполнения блокировки для чтения.

alternating_single_read

Чередуются блокировки для чтения и для записи. Один читающий поток получает возможность блокировки для чтения, когда подходит «очередь» читающих потоков. Эта политика в целом отдает приоритет записывающим потокам. Например, если мьютекс заблокирован для записи и имеется несколько потоков, ожидающих блокировки для чтения, а также один поток, ожидающий блокировки для записи, сначала будет выполнена одна блокировка для чтения, затем блокировка для записи и после нее — все остальные блокировки для чтения. Подразумевается, что за это время не будет новых запросов на блокировку.

alternating_many_reads

Чередуются блокировки для чтения и для записи. Выполняются все блокировки для чтения, когда подходит «очередь» читающих потоков. Другими словами, эта политика приводит к опустошению очереди всех потоков, ожидающих блокировки для чтения, в промежутке между блокировками для записи.

Каждая из этих политик имеет свои достоинства и недостатки, и их влияние будет сказываться по-разному в различных приложениях. Необходимо тщательно подойти к выбору политики, потому что если просто обеспечить приоритет для чтения или записи, это приведет к зависанию, которое я более подробно описываю ниже.

Опасности

При программировании многопоточной обработки возникает три основные проблемы: взаимная блокировка (deadlock), зависание (starvation) и состояния состязания (race conditions — условия гонок). Существуют различные по сложности методы устранения этих проблем, но их рассмотрение выходит за рамки данного рецепта. Я дам описание каждой их этих проблем, чтобы вы знали, чего следует остерегаться, но если вы планируете разработку многопоточного приложения, вам необходимо сначала выполнить некоторое предварительную работу по шаблонам многопоточной обработки.

Взаимная блокировка связана с наличием, по крайней мере, двух потоков и двух ресурсов. Пусть имеется два потока, А и В, и два ресурса, X и Y, причем поток А блокирует ресурс X, а В блокирует Y. Взаимная блокировка возникает в том случае, когда А пытается заблокировать Y, а В пытается заблокировать X. Если при работе потоков не предусмотреть какой-либо способ устранения взаимных блокировок, они будут ждать бесконечно.

Библиотека Boost Threads позволяет избегать взаимных блокировок благодаря уточнению концепций мьютекса и блокировки. Пробный мьютекс (try mutex) — это мьютекс, который используется для определения возможности блокировки путем выполнения пробной блокировки (try lock); она может быть успешной или нет, но не блокирует ресурс, а ждет момента, когда блокировка станет возможной. Применяя модели этих концепций в форме классов try_mutex и scoped_try_lock, вы можете в своей программе идти дальше и выполнять какие-то другие действия, если доступ к требуемому ресурсу заблокирован. Существует еще одно уточнение концепции пробной блокировки, называемое временной блокировкой (timed lock). Я не рассматриваю здесь подробно временные блокировки; детальное их описание вы найдете в документации библиотеки Boost Threads.

Например, в классе Queue из примера 12.2 требуется использовать мьютекс для пробной блокировки с возвратом функцией dequeue значения типа bool, показывающего, может или не может быть извлечен из очереди первый элемент. В этом случае при применении функции dequeue не приходится ждать блокировки очереди. Ниже показано, как можно переписать функцию dequeue.

bool dequeue(T& x) {

 boost::try_mutex::scoped_try_lock lock(tryMutex_);

 if (!lock.locked())

  return(false);

 else {

  if (list_.empty()) throw "empty!";

  x = list_.front();

  list_.pop_front();

  return(true);

 }

}

private:

 boost::try_mutex tryMutex_;

 // ...

Используемые здесь мьютекс и блокировка отличаются от тех, которые применялись в примере 12.2. Убедитесь, что используемые вами имена классов мьютекса и блокировки правильно квалифицированы, в противном случае вы получите не то, на что рассчитываете.

При сериализации доступа к чему-либо вы заставляете пользователей этого ресурса выстраиваться друг за другом и дожидаться свой очереди. Если положение пользователей ресурса в очереди остается неизменным, каждый из них имеет шанс получения доступа к ресурсу. Однако если некоторым пользователям разрешается сокращать свою очередь, то до находящихся в конце очередь может никогда не дойти. Возникает зависание.

При использовании мьютекса mutex пользователи ресурса, которые находятся в состоянии ожидания, образуют группу, а не последовательность. Нельзя сказать, что существует определенный порядок между потоками, ожидающими возможности выполнения блокировки. Для мьютексов чтения/записи в библиотеке Boost Threads используется четыре политики планирования блокировок, которые были описаны ранее. Поэтому при использовании мьютексов чтения/записи необходимо понимать смысл различных политик планирования и действий ваших потоков. Если вы используете политику writer_priority и у вас много потоков, создающих блокировки для записи, ваши читающие потоки будут зависать; то же самое произойдет при применении политики reader_priority, поскольку эти политики планирования всегда отдают предпочтение одному из двух типов блокировки. Если в ходе тестирования вы понимаете, что один из типов потоков продвигается в очереди недостаточно, рассмотрите возможность перехода на применение политики alternating_many_reads или alternating_single_read. Тип политики задается при конструировании мьютекса чтения/записи.

Наконец, состояние состязания возникает в том случае, когда в программе делается предположение об определенном порядке выполнения блокировок или об их атомарности, что оказывается неверным. Например, рассмотрим пользователя класса Queue, который опрашивает первый элемент очереди и при определенном условии извлекает его из очереди с помощью функции dequeue.

if (q.getFront() == "Cyrus") {

 str = q.dequeue();

 // ...

Этот фрагмент программного кода хорошо работает в однопоточной среде, потому что q не может быть модифицирован в промежутке между первой и второй строкой. Однако в условиях многопоточной обработки, когда практически в любой момент другой поток может модифицировать q, следует исходить из предположения, что совместно используемые объекты модифицируются, когда поток не блокирует доступ к ним. После строки 1 другой поток, работая параллельно, может извлечь следующий элемент из q при помощи функции dequeue, что означает получение в строке 2 чего-то неожиданного или совсем ничего. Как функция getFront, так и функция dequeue блокирует один объект mutex, используемый для модификации q, но между их вызовами мьютекс разблокирован, и, если другой поток находится в ожидании выполнения блокировки, он может это сделать до того, как получит свой шанс строка 2.

Проблема состояния состязания в этом конкретном случае решается путем гарантирования сохранения блокировки на весь период выполнения операции. Создайте функцию-член dequeueIfEquals, которая извлекает следующий объект из очереди, если он равен аргументу. Функция dequeueIfEquals может использовать блокировку, как и всякая другая функция.

T dequeueIfEquals(const T& t) {

 boost::mutex::scoped_lock lock(mutex_);

 if (list_.front() == t)

 // ...

Существуют состояния состязания другого типа, но этот пример должен дать общее представление о том, чего следует остерегаться. По мере увеличения количества потоков и совместно используемых ресурсов состояния состязания оказываются более изощренными и обнаруживать их сложнее. Поэтому следует быть особенно осторожным на этапе проектирования, чтобы не допускать их.

В многопоточной обработке самое сложное — гарантировать сериализованный доступ к ресурсам, потому что если это сделано неправильно, отладка становится кошмаром. Поскольку многопоточная программа по своей сути недетерминирована (так как потоки могут выполняться в различной очередности и с различными квантами времени при каждом новом выполнении программы), очень трудно точно обнаружить место и способ ошибочной модификации чего-либо. Здесь еще в большей степени, чем в однопоточном программировании, надежный проект позволяет минимизировать затраты на отладку и переработку.