7.3. Схема производитель-потребитель
7.3. Схема производитель-потребитель
Одна из классических задач на синхронизацию называется задачей производителя и потребителя. Она также известна как задача ограниченного буфера. Один или несколько производителей (потоков или процессов) создают данные, которые обрабатываются одним или несколькими потребителями. Эти данные передаются между производителями и потребителями с помощью одной из форм IPC.
С этой задачей мы регулярно сталкиваемся при использовании каналов Unix. Команда интерпретатора, использующая канал
grep pattern chapters.* | wc -l
является примером такой задачи. Программа grep выступает как производитель (единственный), a wc — как потребитель (тоже единственный). Канал используется как форма IPC. Требуемая синхронизация между производителем и потребителем обеспечивается ядром, обрабатывающим команды write производителя и read покупателя. Если производитель опережает потребителя (канал переполняется), ядро приостанавливает производителя при вызове write, пока в канале не появится место. Если потребитель опережает производителя (канал опустошается), ядро приостанавливает потребителя при вызове read, пока в канале не появятся данные.
Такой тип синхронизации называется неявным; производитель и потребитель не знают о том, что синхронизация вообще осуществляется. Если бы мы использовали очередь сообщений Posix или System V в качестве средства IPC между производителем и потребителем, ядро снова взяло бы на себя обеспечение синхронизации.
При использовании разделяемой памяти как средства IPC производителя и потребителя, однако, требуется использование какого-либо вида явной синхронизации. Мы продемонстрируем это на использовании взаимного исключения. Схема рассматриваемого примера изображена на рис. 7.1.
В одном процессе у нас имеется несколько потоков-производителей и один поток-потребитель. Целочисленный массив buff содержит производимые и потребляемые данные (данные совместного пользования). Для простоты производители просто устанавливают значение buff[0] в 0, buff [1] в 1 и т.д. Потребитель перебирает элементы массива, проверяя правильность записей.
В этом первом примере мы концентрируем внимание на синхронизации между отдельными потоками-производителями. Поток-потребитель не будет запущен, пока все производители не завершат свою работу. В листинге 7.1 приведена функция main нашего примера.
Рис. 7.1. Производители и потребитель
Листинг 7.1.[1] Функция main
//mutex/prodcons2.с
1 #include "unpipc.h"
2 #define MAXNITEMS 1000000
3 #define MAXNTHREADS 100
4 int nitems; /* только для чтения потребителем и производителем */
5 struct {
6 pthread_mutex_t mutex;
7 int buff[MAXNITEMS];
8 int nput;
9 int nval;
10 } shared = {
11 PTHREAD_MUTEX_INITIALIZER
12 };
13 void *produce(void *), *consume(void *);
14 int
15 main(int argc, char **argv)
16 {
17 int i, nthreads, count[MAXNTHREADS];
18 pthread_t tid_produce[MAXNTHREADS], tid_consume;
19 if (argc != 3)
20 err_quit("usage: prodcons2 <#items> <#threads>");
21 nitems = min(atoi(argv[1]), MAXNITEMS);
22 nthreads = min(atoi(argv[2]), MAXNTHREADS);
23 Set_concurrency(nthreads);
24 /* запуск всех потоков-производителей */
25 for (i = 0; i < nthreads; i++) {
26 count[i] = 0;
27 Pthread_create(&tid_produce[i], NULL, produce, &count[i]);
28 }
29 /* ожидание завершения всех производителей */
30 for (i = 0; i < nthreads; i++) {
31 Pthread_join(tid_produce[i], NULL);
32 printf("count[%d] = %d ", i, count[i]);
33 }
34 /* запуск и ожидание завершения потока-потребителя */
35 Pthread_create(&tid_consume, NULL, consume, NULL);
36 Pthread_join(tid_consume, NULL);
37 exit(0);
38 }
Совместное использование глобальных переменных потоками
4-12 Эти переменные совместно используются потоками. Мы объединяем их в структуру с именем shared вместе с взаимным исключением, чтобы подчеркнуть, что доступ к ним можно получить только вместе с ним. Переменная nput хранит индекс следующего элемента массива buff, подлежащего обработке, a nval содержит следующее значение, которое должно быть в него помещено (0, 1, 2 и т.д.). Мы выделяем память под эту структуру и инициализируем взаимное исключение, используемое для синхронизации потоков-производителей.
ПРИМЕЧАНИЕ
Мы всегда будем стараться размещать совместно используемые данные вместе со средствами синхронизации, к ним относящимися (взаимными исключениями, условными переменными, семафорами), в одной структуре, как мы сделали в этом примере. Это хороший стиль программирования. Однако во многих случаях совместно используемые данные являются динамическими, представляя собой, например, связный список. Мы, наверное, сможем поместить в структуру первый элемент списка вместе со средствами синхронизации (как в структуре mq_hdr в листинге 5.16), но оставшаяся часть списка в структуру не попадет. Следовательно, это решение не всегда является идеальным.
Аргументы командной строки
19-22 Первый аргумент командной строки указывает количество элементов, которые будут произведены производителями, а второй — количество запускаемых потоков-производителей.
Установка уровня параллельности
23 Функция set_concurrency (наша собственная) указывает подсистеме потоков количество одновременно выполняемых потоков. В Solaris 2.6 она просто вызывает thr_setconcurrency, причем ее запуск необходим, если мы хотим, чтобы у нескольких процессов-производителей была возможность начать выполняться. Если мы не сделаем этого вызова в системе Solaris, будет запущен только первый поток. В Digital Unix 4.0B наша функция set_concurrency не делает ничего, поскольку в этой системе по умолчанию все потоки процесса имеют равные права на вычислительные ресурсы.
ПРИМЕЧАНИЕ
Unix 98 требует наличия функции pthread_setconcurrency, выполняющей это же действие. Эта функция требуется для тех реализаций, которые мультиплексируют пользовательские потоки (создаваемые функцией pthread_create) на небольшое множество выполняемых потоков ядра. Такие реализации часто называются «многие-к-немногим» (many-to-few), «двухуровневые» (two-level) или «М-на-N» (M-to-N). В разделе 5.6 книги [3] отношения между пользовательскими потоками и потоками ядра рассматриваются более подробно.
Создание процессов-производителей
24-28 Создаются потоки-производители, каждый из которых вызывает функцию produce. Идентификаторы потоков хранятся в массиве tid_produce. Аргументом каждого потока-производителя является указатель на элемент массива count. Счетчики инициализируются значением 0, и каждый поток увеличивает значение своего счетчика на 1 при помещении очередного элемента в буфер. Содержимое массива счетчиков затем выводится на экран, так что мы можем узнать, сколько элементов было помещено в буфер каждым из потоков.
Ожидание завершения работы производителей, запуск потребителя
29-36 Мы ожидаем завершения работы всех потоков-производителей, выводя содержимое счетчика для каждого потока, а затем запускаем единственный процесс-потребитель. Таким образом (на данный момент) мы исключаем необходимость синхронизации между потребителем и производителями. Мы ждем завершения работы потребителя, а затем завершаем работу процесса. В листинге 7.2 приведен текст функций produce и consume.
Листинг 7.2. Функции produce и consume
//mutex/prodcons2.с
39 void *
40 produce(void *arg)
41 {
42 for (;;) {
43 Pthread_mutex_lock(&shared.mutex);
44 if (shared.nput >= nitems) {
45 Pthread_mutex_unlock(&shared.mutex);
46 return(NULL); /* массив полный, готово */
47 }
48 shared.buff[shared.nput] = shared.nval;
49 shared.nput++;
50 shared.nval++;
51 Pthread_mutex_unlock(&shared.mutex);
52 *((int *) arg) += 1;
53 }
54 }
55 void *
56 consume(void *arg)
57 {
58 int i;
59 for (i = 0; i < nitems; i++) {
60 if (shared.buff[i] != i)
61 printf("buff[%d] = %d ", i, shared.buff[i]);
62 }
63 return(NULL);
64 }
Формирование данных
42-53 Критическая область кода производителя состоит из проверки на достижение конца массива (завершение работы)
if (shared.nput >= nitems)
и трех строк, помещающих очередное значение в массив:
shared.buff[shared.nput] = shared.nval;
shared.nput++;
shared.nval++;
Мы защищаем эту область с помощью взаимного исключения, не забыв разблокировать его после завершения работы. Обратите внимание, что увеличение элемента count (через указатель arg) не относится к критической области, поскольку у каждого потока счетчик свой (массив count в функции main). Поэтому мы не включаем эту строку в блокируемую взаимным исключением область. Один из принципов хорошего стиля программирования заключается в минимизации объема кода, защищаемого взаимным исключением.
Потребитель проверяет содержимое массива
59-62 Потребитель проверяет правильность значений всех элементов массива и выводит сообщение в случае обнаружения ошибки. Как уже говорилось, эта функция запускается в единственном экземпляре и только после того, как все потоки-производители завершат свою работу, так что надобность в синхронизации отсутствует.
При запуске только что описанной программы с пятью процессами-производителями, которые должны вместе создать один миллион элементов данных, мы получим следующий результат:
solaris % prodcons2 1000000 5
count[0] = 167165
count[1] = 249891
count[2] = 194221
count[3] = 191815
count[4] = 196908
Как мы отмечали ранее, если убрать вызов set_concurrency, в системе Solaris 2.6 значение count[0] будет 1000000, а все остальные счетчики будут нулевыми.
Если убрать из этого примера блокировку с помощью взаимного исключения, он перестанет работать, как и предполагается. Потребитель обнаружит множество элементов buff[i], значения которых будут отличны от i. Также мы можем убедиться, что удаление блокировки ничего не изменит, если будет выполняться только один поток.
Данный текст является ознакомительным фрагментом.