6.3.1. Разработка потокобезопасной справочной таблицы с блокировками
Справочная таблица, или словарь позволяет ассоциировать значения одного типа (типа ключа) со значениями того же или другого типа (ассоциированного типа). В общем случае задача такой структуры — запрашивать данные, ассоциированные с данным ключом. В стандартной библиотеке С++ для этой цели предназначены ассоциативные контейнеры: std::map<>, std::multimap<>, std::unordered_map<>, и std::unordered_multimap<>.
Справочная таблица используется иначе, чем стек или очередь. Если большинство операций со стеком и очередью каким-то образом модифицируют структуру данных, то есть либо добавляют, либо удаляют элементы, то справочная таблица изменяется сравнительно редко. Примером может служить простой DNS-кэш из листинга 3.13, предоставляющий интерфейс, сильно урезанный по сравнению с std::map<>. При рассмотрении стека и очереди мы видели, что интерфейсы стандартных контейнеров не годятся для случая, когда к структуре данных одновременно обращаются несколько потоков, так как им внутренне присущи состояния гонки. Поэтому интерфейсы приходится пересматривать и сокращать.
Самая серьезная с точки зрения распараллеливания проблема в интерфейсе контейнера std::map<> — его итераторы. Можно написать итератор так, что доступ к контейнеру с его помощью для чтения и модификации будет безопасен, но это амбициозная задача. Для корректной работы нужно учесть, что другой поток может удалить элемент, на который итератор сейчас ссылается, а это совсем непросто. Поэтому при первом подходе к проектированию интерфейса потокобезопасной справочной таблицы итераторы мы опустим. Памятуя о том, насколько сильно интерфейс std::map<> (и прочих стандартных ассоциативных контейнеров) зависит от итераторов, пожалуй, будет разумнее сразу отказаться от попыток смоделировать их и вместо этого придумать новый интерфейс с нуля.
Справочной таблице нужно всего несколько основных операций:
• добавить новую пару ключ/значение;
• изменить значение, ассоциированное с данным ключом;
• удалить ключ и ассоциированное с ним значение;
• получить значение, ассоциированное с данным ключом, если такой ключ существует.
Есть также несколько полезных операций, относящихся к контейнеру в целом, например: проверить, пуст ли контейнер, получить полный текущий список ключей или полное множество пар ключ/ значение.
Если придерживаться базовых рекомендаций, касающихся потокобезопасности, например, не возвращать ссылки и защищать мьютексом все тело каждой функции-члена, то все эти операции будут безопасны. Угроза возникновения гонки возникает прежде всего при добавлении новой пары ключ/значение; если два потока одновременно добавляют новое значение, то лишь один из них будет первым, а второй, следовательно, получит ошибку. Один из способов решить эту проблему состоит в том, чтобы объединить операции добавления и изменения в одной функции-члене, как мы проделали в DNS-кэше в листинге 3.13.
С точки зрения интерфейса, остается еще лишь один интересный момент: фраза «если такой ключ существует» в описании операции получения ассоциированного значения. Можно, например, предоставить пользователю возможность задать некий результат «по умолчанию», который будет возвращен, если указанного ключа нет.
mapped_type get_value(
key_type const& key, mapped_type default_value);
В таком случае можно использовать сконструированный по умолчанию экземпляр типа mapped_type, если default_value не было задано явно. Эту идею можно развить и возвращать не просто экземпляр mapped_type, а объект типа std::pair<mapped_type, bool>, где bool указывает, было ли найдено значение. Другой вариант — использовать интеллектуальный указатель, ссылающийся на значение; если он равен NULL, то значение не было найдено.
Как уже отмечалось, после определения интерфейса (в предположении, что состояний гонки в нем нет), обеспечить потокобезопасность можно, заведя единственный мьютекс, который дет захватываться в начале каждой функции-члена для защиты внутренней структуры данных. Однако тем самым мы сведем на нет все возможности распараллеливания, которые могли бы открыться в результате наличия отдельных функций для чтения и модификации структуры данных. Отчасти исправить ситуацию можно было бы, воспользовавшись мьютексом, который разрешает нескольким потокам читать данные, но только одному — изменять их, например, классом boost::shared_mutex из листинга 3.13. Это, конечно, несколько улучшило бы общую картину, но при таком решении модифицировать структуру данных по-прежнему может только один поток. В идеале хотелось бы добиться большего.
Проектирование структуры данных для справочной таблицы с мелкогранулярными блокировками
Как и в случае очереди (см. раздел 6.2.3), чтобы ввести мелкогранулярные блокировки, нужно внимательно изучить особенности структуры данных, а не пытаться обернуть уже имеющийся контейнер, например std::map<>. Есть три общепринятых подхода к реализации ассоциативного контейнера, каковым является, в частности, справочная таблица:
• двоичное, например красно-черное, дерево;
• отсортированный массив;
• хеш-таблица.
Двоичное дерево плохо приспособлено для распараллеливания — каждый просмотр или модификация должны начинается с доступа к корневому узлу, который, следовательно, нужно блокировать. Блокировку, конечно, можно освобождать по мере спуска вниз по дереву, но в целом это немногим лучше блокирования всей структуры целиком.
Отсортированный массив еще хуже, потому что заранее невозможно сказать, в каком месте массива может оказаться требуемое значение, поэтому придется заводить одну блокировку на весь массив. Остается только хеш-таблица. В предположении, что число кластеров фиксировано, вопрос о том, в каком кластере находится ключ, является свойством только лишь самого ключа и хеш-функции. А это значит, что мы сможем завести по одной блокировке на каждый кластер. Если еще и использовать мьютекс, который поддерживает несколько читателей и одного писателя, то коэффициент распараллеливания увеличится в N раз, где N — число кластеров. Недостаток в том, что нужна хорошая хеш-функция для ключа. В стандартной библиотеке С++ имеется шаблон std::hash<>, которым можно воспользоваться для этой цели. Он уже специализирован для таких фундаментальных типов, как int, и некоторых библиотечных типов, например std::string, а пользователь может без труда написать специализации и для других типов ключа. Если, следуя примеру стандартных неупорядоченных контейнеров, указать в качестве параметра шаблона тип объекта-функции, которая выполняет хеширование, то пользователь сможет сам решить, специализировать ли ему шаблон std::hash<> для типа своего ключа или предоставить отдельную хеш-функцию.
Теперь обратимся собственно к коду. Как могла бы выглядеть реализация потокобезопасной справочной таблицы? Один из вариантов показан ниже.
Листинг 6.11. Потокобезопасная справочная таблица
template<typename Key, typename Value,
typename Hash = std::hash<Key> >
class threadsafe_lookup_table {
private:
class bucket_type {
private:
typedef std::pair<Key, Value> bucket_value;
typedef std::list<bucket_value> bucket_data;
typedef typename bucket_data::iterator bucket_iterator;
bucket_data data;
mutable boost::shared_mutex mutex;← (1)
bucket_iterator find_entry_for(Key const& key) const {← (2)
return std::find_if(data.begin(), data.end(),
[&](bucket_value const& item)
{ return item.first == key; });
}
public:
Value value_for(
Key const& key, Value const& default_value) const {
boost::shared_lock<boost::shared_mutex> lock(mutex);← (3)
bucket_iterator const found_entry = find_entry_for(key);
return (found_entry==data.end()) ?
default_value : found_entry->second;
}
void add_or_update_mapping(
Key const& key, Value const& value) {
std::unique_lock<boost::shared_mutex> lock(mutex);← (4)
bucket_iterator const found_entry = find_entry_for(key);
if (found_entry == data.end()) {
data.push_back(bucket_value(key, value));
} else {
found_entry->second = value;
}
}
void remove_mapping(Key const& key) {
std::unique_lock<boost::shared_mutex> lock(mutex);← (5)
bucket_iterator const found_entry = find_entry_for(key);
if (found_entry != data.end()) {
data.erase(found_entry);
}
}
};
std::vector<std::unique_ptr<bucket_type>> buckets;← (6)
Hash hasher;
bucket_type& get_bucket(Key const& key) const {← (7)
std::size_t const bucket_index = hasher(key)%buckets.size();
return *buckets[bucket_index];
}
public:
typedef Key key_type;
typedef Value mapped_type;
typedef Hash hash_type;
threadsafe_lookup_table(
unsigned num_buckets = 19,
Hash const& hasher_ = Hash()):
buckets(num_buckets), hasher(hasher_) {
for (unsigned i = 0; i < num_buckets; ++i) {
buckets[i].reset(new bucket_type);
}
}
threadsafe_lookup_table(
threadsafe_lookup_table const& other) = delete;
threadsafe_lookup_table& operator=(
threadsafe_lookup_table const& other) = delete;
Value value_for(Key const& key,
Value const& default_value = Value()) const {
return get_bucket(key).value_for(key, default_value);← (8)
}
void add_or_update_mapping(Key const& key,Value const& value) {
get_bucket(key).add_or_update_mapping(key, value);← (9)
}
void remove_mapping(Key const& key) {
get_bucket(key).remove_mapping(key);← (10)
}
};
В этой реализации для хранения кластеров используется вектор std::vector<std::unique_ptr<bucket_type>> (6), это позволяет задавать число кластеров в конструкторе. По умолчанию оно равно 19 (произвольно выбранное простое число); оптимальные показатели работы хеш-таблиц достигаются, когда имеется простое число кластеров. Каждый кластер защищен мьютексом типа boost::shared_mutex (1), который позволяет нескольким потокам одновременно читать, но только одному обращаться к любой из функций модификации данного кластера.
Поскольку количество кластеров фиксировано, функцию get_bucket() (7) можно вызывать вообще без блокировки (8), (9), (10), а затем захватить мьютекс кластера для совместного (только для чтения) (3) или монопольного (чтение/запись) (4), (5) владения — в зависимости от вызывающей функции.
Во всех трех функциях используется функция-член кластера find_entry_for() (2), которая определяет, есть ли в данном кластере указанный ключ. Каждый кластер содержит всего лишь список std::list<> пар ключ/значение, поэтому добавлять и удалять элементы легко.
Я уже рассматривал это решение с точки зрения распараллеливания, и мы убедились, что все надежно защищено мьютексами. Но как обстоит дело с безопасностью относительно исключений? Функция value_for ничего не изменяет, поэтому с ней проблем нет: если она и возбудит исключение, то на структуру данных это не повлияет.
Функция remove_mapping модифицирует список, обращаясь к его функции-члену erase, которая гарантированно не возбуждает исключений, так что здесь тоже всё безопасно. Остается функция add_or_update_mapping, которая может возбуждать исключения в обеих ветвях if. Функция push_back безопасна относительно исключений, то есть в случае исключения оставляет список в исходном состоянии, так что с этой ветвью всё хорошо. Единственная проблема — присваивание в случае замены существующего значения; если оператор присваивания возбуждает исключение, то мы полагаемся на то, что он оставляет исходный объект неизмененным. Однако это не влияет на структуру данных в целом и является свойством пользовательского типа, так что пускай пользователь и решает эту проблему.
В начале этого раздела я упоминал, что было бы хорошо, если бы можно было получить текущее состояние справочной таблицы, например, в виде объекта std::map<>. Чтобы копия состояния была согласована, потребуется заблокировать контейнер целиком, то есть все кластеры сразу. Поскольку для «обычных» операций захватывается мьютекс только одного кластера, то получение копии состояния будет единственной операцией, блокирующей все кластеры. При условии, что мьютексы всегда захватываются в одном и том же порядке, взаимоблокировка не возникнет. Такая реализация приведена в следующем листинге.
Листинг 6.12. Получение содержимого таблицы threadsafe_lookup_table в виде std::map<>
std::map<Key, Value> threadsafe_lookup_table::get_map() const {
std::vector<std::unique_lock<boost::shared_mutex> > locks;
for (unsigned i = 0; i < buckets.size(); ++i) {
locks.push_back(
std::unique_lock<boost::shared_mutex>(buckets[i].mutex));
}
std::map<Key, Value> res;
for (unsigned i = 0; i < buckets.size(); ++i) {
for (bucket_iterator it = buckets[i].data.begin();
it != buckets[i].data.end(); ++it) {
res.insert(*it);
}
}
return res;
}
Реализация справочной таблицы, показанная в листинге 6.11, увеличивает уровень параллелизма таблицы в целом за счет того, что каждый кластер блокируется отдельно, и при этом используется boost::shared_mutex, который позволяет нескольким потокам одновременно читать кластер. Но нельзя ли добиться большего уровня параллелизма, еще уменьшив гранулярность блокировок? В следующем разделе мы покажем, как это сделать, воспользовавшись потокобезопасным списковым контейнером с поддержкой итераторов.