10.10. Несколько производителей, несколько потребителей
10.10. Несколько производителей, несколько потребителей
Следующее изменение, которое мы внесем в нашу пpoгрaммy, будет заключаться в добавлении возможности одновременной работы нескольких потребителей вместе с несколькими производителями. Есть ли смысл в наличии нескольких потребителей — зависит от приложения. Автор видел два примера, в которых использовался этот метод.
1. Пpoгрaммa преобразования IP-адресов в имена узлов. Каждый потребитель берет IP-адрес, вызывает gethostbyaddr (раздел 9.6 [24]), затем дописывает имя узла к файлу. Поскольку каждый вызов gethostbyaddr обрабатывается неопределенное время, порядок IP-адресов в буфере будет, скорее всего, отличаться от порядка имен узлов в файле, созданном потоками-потребителями. Преимущество этой схемы в параллельности выполнения вызовов gethostbyaddr (каждый из которых может работать несколько секунд) — по одному на каждый поток-потребитель.
ПРИМЕЧАНИЕ
Предполагается наличие версии gethostbyaddr, допускающей многократное вхождение, что не всегда верно. Если эта версия недоступна, можно хранить буфер в разделяемой памяти и использовать процессы вместо потоков.
2. Программа, принимающая дейтаграммы UDP, обрабатывающая их и записывающая результат в базу данных. Каждая дeйтaгрaммa обрабатывается одним потоком-потребителем, которые выполняются параллельно для ускорения процесса. Хотя дейтаграммы записываются в базу данных в порядке, вообще говоря, отличном от порядка их приема, встроенная схема упорядочения записей в базе данных справляется с этой проблемой.
В листинге 10.15 приведены глобальные переменные программы.
Листинг 10.15. Глобальные переменные
//pxsem/prodcons4.с
1 #include "unpipc.h"
2 #define NBUFF 10
3 #define MAXNTHREADS 100
4 int nitems, nproducers, nconsumers; /* только для чтения */
5 struct { /* общие данные производителей и потребителей */
6 int buff[NBUFF];
7 int nput; /* номер объекта: 0, 1. 2, … */
8 int nputval; /* сохраняемое в buff[] значение */
9 int nget; /* номер объекта: 0, 1, 2, … */
10 int ngetval; /* получаемое из buff[] значение */
11 sem_t mutex, nempty, nstored; /* семафоры, а не указатели */
12 } shared;
13 void *produce(void *), *consume(void *);
Глобальные переменные и общая структура
4-12 Количество потоков-потребителей является глобальной переменной, устанавливаемой из командной строки. В структуру shared добавилось два новых поля: nget — номер следующего объекта, получаемого одним из потоков-потребителей, и ngetval — соответствующее значение.
Функция main, текст которой приведен в листинге 10.16, запускает несколько потоков-потребителей и потоков-производителей одновременно.
19-23 Новый аргумент командной строки указывает количество потоков-потребителей. Для хранения идентификаторов потоков-потребителей выделяется место под специальный массив (tid_consume), а для подсчета обработанных каждым потоком объектов выделяется массив conscount.
24-50 Создаются несколько потоков-производителей и потребителей, после чего основной поток ждет их завершения.
Листинг 10.16. Функция main для версии с несколькими производителями и потребителями
//pxsem/prodcons4.с
14 int
15 main(int argc, char **argv)
16 {
17 int i, prodcount[MAXNTHREADS], conscount[MAXNTHREADS];
18 pthread_t tid_produce[MAXNTHREADS], tid_consume[MAXNTHREADS];
19 if (argc != 4)
20 err_quit("usage: prodcons4 <#items> <#producers> <#consumers>");
21 nitems = atoi(argv[1]);
22 nproducers = min(atoi(argv[2]), MAXNTHREADS);
23 nconsumers = min(atoi(argv[3]), MAXNTHREADS);
24 /* инициализация трех семафоров */
25 Sem_init(&shared.mutex, 0, 1);
26 Sem_init(&shared.nempty, 0, NBUFF);
27 Sem_init(&shared.nstored, 0, 0);
28 /* создание производителей и потребителей */
29 Set_concurrency(nproducers + nconsumers);
30 for (i = 0; i < nproducers; i++) {
31 prodcount[i] = 0;
32 Pthread_create(&tid_produce[i], NULL, produce, &prodcount[i]);
33 }
34 for (i = 0; i < nconsumers; i++) {
35 conscount[i] = 0;
36 Pthread_create(&tid_consume[i], NULL, consume, &conscount[i]);
37 }
38 /* ожидание завершения всех производителей и потребителей */
39 for (i = 0; i < nproducers: i++) {
40 Pthread_join(tid_produce[i], NULL);
41 printf("producer count[%d] = %d ", i, prodcount[i]);
42 }
43 for (i = 0; i < nconsumers; i++) {
44 Pthread_join(tid_consume[i], NULL);
45 printf("consumer count[%d] = %d ", i, conscount[i]);
46 }
47 Sem_destroy(&shared.mutex);
48 Sem_destroy(&shared.nempty);
49 Sem_destroy(&shared.nstored);
50 exit(0);
51 }
Функция produce содержит одну новую строку по сравнению с листингом 10.13. В части кода, относящейся к завершению потока-производителя, появляется строка, отмеченная знаком +:
if (shared.nput >= nitems) {
+ Sem_post(&shared.nstored); /* даем возможность потребителям завершить работу */
Sem_post(&shared.nempty);
Sem_post(&shared.mutex);
return(NULL); /* готово */
}
Снова нам нужно быть аккуратными при обработке завершения процессов-производителей и потребителей. После обработки всех объектов в буфере все потребители блокируются в вызове
Sem_wait(&shared.nstored); /* Ожидание помещения объекта в буфер */
Производителям приходится увеличивать семафор nstored для разблокирования потрeбитeлeй, чтобы они узнали, что работа завершена. Функция consume приведена в листинге 10.17.
Листинг 10.17. Функция, выполняемая всеми потоками-потребителями
//pxsem/prodcons4.c
72 void *
73 consume(void *arg)
74 {
75 int i;
76 for (;;) {
77 Sem_wait(&shared.nstored); /* ожидание помещения объекта в буфер */
78 Sem_wait(&shared.mutex);
79 if (shared.nget >= nitems) {
80 Sem_post(&shared.nstored);
81 Sem_post(&shared.mutex);
82 return(NULL); /* готово */
83 }
84 i = shared.nget % NBUFF;
85 if (shared.buff[i] != shared.ngetval)
86 printf("error: buff[%d] = %d ", i, shared.buff[i]);
87 shared.nget++;
88 shared.ngetval++;
89 Sem_post(&shared.mutex);
90 Sem_post(&shared.nempty); /* освобождается место для элемента */
91 *((int *) arg) += 1;
92 }
93 }
Завершение потоков-потребителей
79-83 Функция consume сравнивает nget и nitems, чтобы узнать, когда следует остановиться (аналогично функции produce). Обработав последний объект в буфере, потоки-потребители блокируются, ожидая изменения семафора nstored. Когда завершается очередной поток-потребитель, он увеличивает семафор nstored, давая возможность завершить работу другому потоку-потребителю.
Более 800 000 книг и аудиокниг! 📚
Получи 2 месяца Литрес Подписки в подарок и наслаждайся неограниченным чтением
ПОЛУЧИТЬ ПОДАРОКДанный текст является ознакомительным фрагментом.
Читайте также
Несколько примеров
Несколько примеров Рассмотрим теперь несколько примеров применения каждого метода.Режим с управлением по запросу (send-driven) — модель «клиент/сервер»Файловая система, последовательные порты, консоли и звуковые платы — все это примеры применения модели «клиент/сервер».
4.3.5. Несколько замечаний
4.3.5. Несколько замечаний Для полного понимания процесса создания учетных записей нам нужно познакомиться еще с файлом /etc/login.defs. В нем хранятся настройки, которые будут использоваться при добавлении пользователей. Содержимое файла можно увидеть в листинге 4.1.Листинг 4.1.
Еще несколько слов о языке
Еще несколько слов о языке Перед тем как закончить наш разговор о коммуникации, неплохо было бы затронуть тему грамматических ошибок. Проблемы с правописанием встречаются в Сети довольно часто и вызывают у посетителей не самые лучшие
Несколько полезных замечаний
Несколько полезных замечаний При создании входного файла для схемы, которую вы хотите исследовать, всегда начинайте с полного эскиза схемы. Разметьте узлы, используя для этого маркировку, отличающуюся по цвету от остального текста, например красные или голубые чернила.
10.6. Задача производителей и потребителей
10.6. Задача производителей и потребителей В разделе 7.3 мы описали суть задачи производителей и потребителей и привели несколько возможных ее решений, в которых несколько потоков-производителей заполняли массив, который обрабатывался одним потоком-потребителем.1. В нашем
10.9. Несколько производителей, один потребитель
10.9. Несколько производителей, один потребитель Решение в разделе 10.6 относится к классической задаче с одним производителем и одним потребителем. Новая, интересная модификация программы позволит нескольким производителям работать с одним потребителем. Начнем с решения
10.11. Несколько буферов
10.11. Несколько буферов Во многих программах, обрабатывающих какие-либо данные, можно встретить цикл видаwhile ((n = read(fdin, buff, BUFFSIZE)) > 0) { /* обработка данных */ write(fdout, buff, n);}Например, программы, обрабатывающие текстовые файлы, считывают строку из входного файла, выполняют с ней
Multiple (Несколько)
Multiple (Несколько) Программа AutoCAD выполняет полное сканирование экрана каждый раз, когда происходит выделение объекта. Режим Multiple (Несколько) позволяет выделить несколько объектов без задержки, и при нажатии клавиши Enter все точки будут выбраны за одно сканирование
3.14.9. Еще несколько образцов
3.14.9. Еще несколько образцов Завершим наш список несколькими выражениями из категории «разное». Как обычно, почти все эти задачи можно решить несколькими способами.Пусть нужно распознать двузначный почтовый код американского штата. Проще всего, конечно, взять выражение
Несколько слов о IpcChannel
Несколько слов о IpcChannel Перед тем как перейти к обсуждению файлов конфигурации удаленного взаимодействия, напомним, что .NET 2.0 предлагает тип IpcChannel, обеспечивающий самый быстрый из возможных способов взаимодействия приложений на одной машине. Задачей данной главы
Алгоритм производителей-потребителей
Алгоритм производителей-потребителей Еще один многопоточный алгоритм, тесно связанный с проблемой потоков считывания и записи - алгоритм, решающий проблему производителей и потребителей.Этот раздел адресован только тем программистам, которые работают в среде
Как объединить несколько книг
Как объединить несколько книг 1. Откройте два окна Fiction Book Designer (скажем, окно 1 и окно 2).2. Загрузите книгу в окно 1, выберите весь ее текст (Ctrl+A) и скопируйте его в клипборд (Ctrl+Ins или Ctrl+C).3. В окне 2 кликните на то место, куда Вы хотите поместить текст книги и нажмите Shift+Ins или Ctrl+V.4.
19.1. Несколько слов о GRUB2
19.1. Несколько слов о GRUB2 Загрузчик GRUB (GRand Unified Bootloader) считается более гибким и современным, чем LILO (Linux Loader). Благодаря иной схеме загрузки операционных систем GRUB понимает больше файловых систем, нежели LILO, а именно: FAT/FAT32, ext2, ext3, ReiserFS, XFS, BSDFS и др.Но время не стоит на месте. В
Несколько выходящих документов
Несколько выходящих документов Как известно, преобразование в XSLT 1.0 имеет один основной входящий документ (плюс документы, доступные при помощи функции document) и ровно один выходящий документ. То есть, для того, чтобы сгенерировать на основе одного входящего документа
Подход третий: Несколько product owner’ов - несколько backlog’ов
Подход третий: Несколько product owner’ов - несколько backlog’ов Похоже на второй вариант, по отдельному product backlog на команду, только ещё и с отдельным product owner’ом на каждую команду. Мы не пробовали так делать, и, скорее всего, пробовать не будем.Если два product backlog’а касаются одного и
§ 1.3 Несколько слов о XML
§ 1.3 Несколько слов о XML Расширяемый язык разметки — eXtensible Markup Language, был создан для хранения структурированных данных в текстовом формате. Теоретически файлы XML должны легко читаться, как программным обеспечением, так и человеком.С использованием технологии XML можно