6.3.2. Потокобезопасный список с блокировками

We use cookies. Read the Privacy and Cookie Policy

Список — одна из самых простых структур данных, и, наверное, написать его потокобезопасную версию будет несложно, правда? Все зависит от того, какая вам нужна функциональность и требуется ли поддержка итераторов — та самая, которую я побоялся включать в справочную таблицу на том основании, что это слишком сложно. Основная идея итератора в духе STL состоит в том, что итератор содержит своего рода ссылку на элемент внутренней структуры данных, образующей контейнер. Если контейнер модифицируется из другого потока, то ссылка должна оставаться действительной, а это значит, что итератор должен удерживать блокировку на какую-то часть структуры. Учитывая, что контейнер никак не контролирует время жизни STL-итератора, такой подход абсолютно непродуктивен.

Альтернатива — включить функции итерирования, например for_each, в сам контейнер. Тогда контейнер сможет полностью управлять своим обходом и блокировкой, но зато перестаёт отвечать рекомендациям но предотвращению взаимоблокировок из главы 3. Чтобы функция for_each делала что-то полезное, она должна вызывать предоставленный пользователем код, удерживая блокировку. Хуже того, она должна передавать ссылку на каждый элемент контейнера пользовательскому коду, чтобы тот мог к этому элементу обратиться. Можно было бы вместо ссылки передавать копию элемента, но это обойдется слишком дорого, если размер элементов велик.

Таким образом, мы оставим на совести пользователя заботу о предотвращении взаимоблокировок, предупредив его, что в пользовательских функциях не следует ни захватывать блокировки, ни сохранять ссылки, которые могли бы использоваться для доступа к данным вне защиты блокировок и тем самым привести к гонке. В случае внутреннего списка, используемого в реализации справочной таблицы, это абсолютно безопасно, поскольку мы не собираемся делать ничего противозаконного.

Остается решить, какие операции должен поддерживать список. Вернувшись к листингам 6.11 и 6.12, вы увидите, что именно нам требуется:

• добавлять элемент в список;

• удалять элемент из списка, если он удовлетворяет некоторому условию;

• искать в списке элемент, удовлетворяющий некоторому условию;

• изменить элемент, удовлетворяющий некоторому условию;

• скопировать все элементы списка в другой контейнер.

Чтобы получился приличный списковый контейнер общего назначения, полезно было бы добавить еще кое-какие операции, например вставку в указанную позицию, но для нашей справочной таблицы это излишне, поэтому оставляю реализацию в качестве упражнения для читателя.

Основная идея связанного списка с мелкогранулярными блокировками — завести по одному мьютексу на каждый узел. Если список длинный, то получится целая куча мьютексов! Достоинство заключается в том, что операции над отдельными частями списка полностью распараллеливаются: каждая операция удерживает только блокировки на узлы, которыми манипулирует, и освобождает блокировку при переходе к следующему узлу. В листинге ниже приведена реализация такого списка.

Листинг 6.13. Потокобезопасный список с поддержкой итераторов

template<typename T>

class threadsafe_list {

 struct node { ← (1)

  std::mutex m;

  std::shared_ptr<T> data;

  std::unique_ptr<node> next;

  node() : ← (2)

   next() {}

 node(T const& value) : ← (3)

  data(std::make_shared<T>(value)) {}

 };

 node head;

public:

 threadsafe_list() {}

 ~threadsafe_list() {

  remove_if([](node const&){return true;});

 }

 threadsafe_list(threadsafe_list const& other) = delete;

 threadsafe_list& operator=(

  threadsafe_list const& other) = delete;

 void push_front(T const& value) {

  std::unique_ptr<node> new_node(new node(value));← (4)

  std::lock_guard<std::mutex> lk(head.m);

  new_node->next = std::move(head.next);          ← (5)

  head.next = std::move(new_node);                ← (6)

 }

 template<typename Function>

 void for_each(Function f) {                     ← (7)

  node* current = &head;

  std::unique_lock<std::mutex> lk(head.m);       ← (8)

  while(node* const next = current->next.get()) {← (9)

   std::unique_lock<std::mutex> next_lk(next->m);← (10)

   lk.unlock();            ← (11)

   f(*next->data);         ← (12)

   current = next;

   lk = std::move(next_lk);← (13)

  }

 }

 template<typename Predicate>

 std::shared_ptr<T> find_first_if(Predicate p) {← (14)

  node* current = &head;

  std::unique_lock<std::mutex> lk(head.m);

  while (node* const next=current->next.get()) {

   std::unique_lock<std::mutex> next_lk(next->m);

   lk.unlock();

   if (p(*next->data)) {← (15)

    return next->data;  ← (16)

   }

   current = next;

   lk = std::move(next_lk);

  }

  return std::shared_ptr<T>();

 }

 template<typename Predicate>             ← (17)

 void remove_if(Predicate p) {

  node* current = &head;

  std::unique_lock<std::mutex> lk(head.m);

  while(node* const next = current->next.get()) {

   std::unique_lock<std::mutex> next_lk(next->m);

   if (p(*next->data)) {                  ← (18)

    std::unique_ptr<node> old_next = std::move(current->next);

    current->next = std::move(next->next);← (19)

    next_lk.unlock();

   }            ← (20)

   else {

    lk.unlock();← (21)

    current = next;

    lk = std::move(next_lk);

   }

  }

 }

};

Показанный в листинге 6.13 шаблон threadsafe_list<> — это реализация односвязного списка, в котором каждый элемент является структурой типа node (1). В роли головы head списка выступает сконструированный по умолчанию объект node, в котором указатель next равен NULL (2). Новые узлы добавляются в список функцией push_front(); сначала новый узел конструируется (4), при этом для хранимых в нем данных выделяется память из кучи (3), но указатель next остается равным NULL. Затем мы должны захватить мьютекс для узла head, чтобы получить нужное значение указателя next (5), после чего вставить узел в начало списка, записав в head.next указатель на новый узел (6). Пока всё хорошо: для добавления элемента в список необходимо захватить только один мьютекс, поэтому никакого риска взаимоблокировки нет. Кроме того, медленная операция выделения памяти происходит вне блокировки, так что блокировка защищает только обновление двух указателей — действия, которые не могут привести к ошибке. Переходим к функциям итерирования.

Для начала рассмотрим функцию for_each() (7). Она принимает объект Function, который применяется к каждому элементу списка; следуя примеру большинства библиотечных алгоритмов, этот объект передаётся по значению и может быть как настоящей функцией, так и объектом класса, в котором определена оператор вызова. В данном случае функция должна принимать в качестве единственного параметра значение типа T. Здесь мы производим передачу блокировки. Сначала захватывается мьютекс в головном узле head (8). Теперь можно безопасно получить указатель на следующий узел next (с помощью get(), потому что мы не принимаем на себя владение указателем). Если этот указатель не равен NULL (9), то захватываем мьютекс в соответствующем узле (10), чтобы обработать данные. Получив блокировку для этого узла, мы можем освободить блокировку для предыдущего узла (11) и вызвать указанную функцию (12). По выходе из этой функции мы можем обновить указатель current на только что обработанный узел и с помощью move передать владение блокировкой от next_lk в lk (13). Поскольку for_each передаёт каждый элемент данных напрямую пользовательской функции Function, мы можем обновить данные, скопировать их в другой контейнер и вообще сделать всё, что угодно. Если функция не делает того, чего нельзя, то это безопасно, потому что на всем протяжении вызова удерживается мьютекс узла, содержащего элемент данных.

Функция find_first_if() (14) аналогична for_each(); существенное отличие заключается в том, что переданный предикат Predicate должен вернуть true, если нужный элемент найден, и false в противном случае (15). Если элемент найден, то мы сразу возвращаем хранящиеся в нем данные (16), прерывая поиск. Можно было бы добиться того же результата с помощью for_each(), но тогда мы продолжили бы просмотр списка до конца, хотя после обнаружения искомого элемента в этом уже нет необходимости.

Функция remove_if() (17) несколько отличается, потому что она должна изменить список; for_each() для этой цели непригодна. Если предикат Predicate возвращает true (18), то мы удаляем узел из списка, изменяя значение current->next (19). Покончив с этим, мы можем освободить удерживаемый мьютекс следующего узла. Узел удаляется, когда объект std::unique_ptr<node>, в который мы его переместили, покидает область видимости (20). В данном случае мы не изменяем current, потому что необходимо проверить следующий узел next. Если Predicate возвращает false, то нужно просто продолжить обход списка, как и раньше (21).

А могут ли при таком обилии мьютексов возникнуть взаимоблокировки или состояния гонки? Ответ — твердое нет, при условии, что полученные от пользователя предикаты и функции ведут себя, как положено. Итерирование всегда производится в одном направлении, начиная с узла head, и следующий мьютекс неизменно блокируется до освобождения текущего, поэтому не может случиться так, что в разных потоках порядок захвата мьютексов будет различен. Единственный потенциальный кандидат на возникновение гонки — удаление исключенного из списка узла в функции remove_if() (20), потому что это делается после освобождения мьютекса (уничтожение захваченного мьютекса приводит к неопределённому поведению). Однако, немного поразмыслив, мы придём к выводу, что это безопасно, так как в этот момент все еще удерживается мьютекс предыдущего узла (current), поэтому ни один другой поток не сможет попытаться захватить мьютекс удаляемого узла.

Что можно сказать по поводу распараллеливания? Вся эта возня с мелкогранулярными блокировками затевалась для того, чтобы увеличить уровень параллелизма по сравнению с единственным мьютексом. Так достигли мы своей цели или нет? Да, безусловно — теперь разные потоки могут одновременно работать с разными узлами списка, выполняя разные функции: for_each() для обработки каждого узла, find_first_if() для поиска или remove_if() для удаления элементов. Но, поскольку мьютексы узлов захватываются по порядку, потоки не могут обгонять друг друга. Если какой-то поток тратит много времени на обработку конкретного узла, то, дойдя до этого узла, остальные потоки вынуждены будут ждать.