10.10. Несколько производителей, несколько потребителей
10.10. Несколько производителей, несколько потребителей
Следующее изменение, которое мы внесем в нашу пpoгрaммy, будет заключаться в добавлении возможности одновременной работы нескольких потребителей вместе с несколькими производителями. Есть ли смысл в наличии нескольких потребителей — зависит от приложения. Автор видел два примера, в которых использовался этот метод.
1. Пpoгрaммa преобразования IP-адресов в имена узлов. Каждый потребитель берет IP-адрес, вызывает gethostbyaddr (раздел 9.6 [24]), затем дописывает имя узла к файлу. Поскольку каждый вызов gethostbyaddr обрабатывается неопределенное время, порядок IP-адресов в буфере будет, скорее всего, отличаться от порядка имен узлов в файле, созданном потоками-потребителями. Преимущество этой схемы в параллельности выполнения вызовов gethostbyaddr (каждый из которых может работать несколько секунд) — по одному на каждый поток-потребитель.
ПРИМЕЧАНИЕ
Предполагается наличие версии gethostbyaddr, допускающей многократное вхождение, что не всегда верно. Если эта версия недоступна, можно хранить буфер в разделяемой памяти и использовать процессы вместо потоков.
2. Программа, принимающая дейтаграммы UDP, обрабатывающая их и записывающая результат в базу данных. Каждая дeйтaгрaммa обрабатывается одним потоком-потребителем, которые выполняются параллельно для ускорения процесса. Хотя дейтаграммы записываются в базу данных в порядке, вообще говоря, отличном от порядка их приема, встроенная схема упорядочения записей в базе данных справляется с этой проблемой.
В листинге 10.15 приведены глобальные переменные программы.
Листинг 10.15. Глобальные переменные
//pxsem/prodcons4.с
1 #include "unpipc.h"
2 #define NBUFF 10
3 #define MAXNTHREADS 100
4 int nitems, nproducers, nconsumers; /* только для чтения */
5 struct { /* общие данные производителей и потребителей */
6 int buff[NBUFF];
7 int nput; /* номер объекта: 0, 1. 2, … */
8 int nputval; /* сохраняемое в buff[] значение */
9 int nget; /* номер объекта: 0, 1, 2, … */
10 int ngetval; /* получаемое из buff[] значение */
11 sem_t mutex, nempty, nstored; /* семафоры, а не указатели */
12 } shared;
13 void *produce(void *), *consume(void *);
Глобальные переменные и общая структура
4-12 Количество потоков-потребителей является глобальной переменной, устанавливаемой из командной строки. В структуру shared добавилось два новых поля: nget — номер следующего объекта, получаемого одним из потоков-потребителей, и ngetval — соответствующее значение.
Функция main, текст которой приведен в листинге 10.16, запускает несколько потоков-потребителей и потоков-производителей одновременно.
19-23 Новый аргумент командной строки указывает количество потоков-потребителей. Для хранения идентификаторов потоков-потребителей выделяется место под специальный массив (tid_consume), а для подсчета обработанных каждым потоком объектов выделяется массив conscount.
24-50 Создаются несколько потоков-производителей и потребителей, после чего основной поток ждет их завершения.
Листинг 10.16. Функция main для версии с несколькими производителями и потребителями
//pxsem/prodcons4.с
14 int
15 main(int argc, char **argv)
16 {
17 int i, prodcount[MAXNTHREADS], conscount[MAXNTHREADS];
18 pthread_t tid_produce[MAXNTHREADS], tid_consume[MAXNTHREADS];
19 if (argc != 4)
20 err_quit("usage: prodcons4 <#items> <#producers> <#consumers>");
21 nitems = atoi(argv[1]);
22 nproducers = min(atoi(argv[2]), MAXNTHREADS);
23 nconsumers = min(atoi(argv[3]), MAXNTHREADS);
24 /* инициализация трех семафоров */
25 Sem_init(&shared.mutex, 0, 1);
26 Sem_init(&shared.nempty, 0, NBUFF);
27 Sem_init(&shared.nstored, 0, 0);
28 /* создание производителей и потребителей */
29 Set_concurrency(nproducers + nconsumers);
30 for (i = 0; i < nproducers; i++) {
31 prodcount[i] = 0;
32 Pthread_create(&tid_produce[i], NULL, produce, &prodcount[i]);
33 }
34 for (i = 0; i < nconsumers; i++) {
35 conscount[i] = 0;
36 Pthread_create(&tid_consume[i], NULL, consume, &conscount[i]);
37 }
38 /* ожидание завершения всех производителей и потребителей */
39 for (i = 0; i < nproducers: i++) {
40 Pthread_join(tid_produce[i], NULL);
41 printf("producer count[%d] = %d ", i, prodcount[i]);
42 }
43 for (i = 0; i < nconsumers; i++) {
44 Pthread_join(tid_consume[i], NULL);
45 printf("consumer count[%d] = %d ", i, conscount[i]);
46 }
47 Sem_destroy(&shared.mutex);
48 Sem_destroy(&shared.nempty);
49 Sem_destroy(&shared.nstored);
50 exit(0);
51 }
Функция produce содержит одну новую строку по сравнению с листингом 10.13. В части кода, относящейся к завершению потока-производителя, появляется строка, отмеченная знаком +:
if (shared.nput >= nitems) {
+ Sem_post(&shared.nstored); /* даем возможность потребителям завершить работу */
Sem_post(&shared.nempty);
Sem_post(&shared.mutex);
return(NULL); /* готово */
}
Снова нам нужно быть аккуратными при обработке завершения процессов-производителей и потребителей. После обработки всех объектов в буфере все потребители блокируются в вызове
Sem_wait(&shared.nstored); /* Ожидание помещения объекта в буфер */
Производителям приходится увеличивать семафор nstored для разблокирования потрeбитeлeй, чтобы они узнали, что работа завершена. Функция consume приведена в листинге 10.17.
Листинг 10.17. Функция, выполняемая всеми потоками-потребителями
//pxsem/prodcons4.c
72 void *
73 consume(void *arg)
74 {
75 int i;
76 for (;;) {
77 Sem_wait(&shared.nstored); /* ожидание помещения объекта в буфер */
78 Sem_wait(&shared.mutex);
79 if (shared.nget >= nitems) {
80 Sem_post(&shared.nstored);
81 Sem_post(&shared.mutex);
82 return(NULL); /* готово */
83 }
84 i = shared.nget % NBUFF;
85 if (shared.buff[i] != shared.ngetval)
86 printf("error: buff[%d] = %d ", i, shared.buff[i]);
87 shared.nget++;
88 shared.ngetval++;
89 Sem_post(&shared.mutex);
90 Sem_post(&shared.nempty); /* освобождается место для элемента */
91 *((int *) arg) += 1;
92 }
93 }
Завершение потоков-потребителей
79-83 Функция consume сравнивает nget и nitems, чтобы узнать, когда следует остановиться (аналогично функции produce). Обработав последний объект в буфере, потоки-потребители блокируются, ожидая изменения семафора nstored. Когда завершается очередной поток-потребитель, он увеличивает семафор nstored, давая возможность завершить работу другому потоку-потребителю.
Данный текст является ознакомительным фрагментом.