10.10. Несколько производителей, несколько потребителей

We use cookies. Read the Privacy and Cookie Policy

10.10. Несколько производителей, несколько потребителей

Следующее изменение, которое мы внесем в нашу пpoгрaммy, будет заключаться в добавлении возможности одновременной работы нескольких потребителей вместе с несколькими производителями. Есть ли смысл в наличии нескольких потребителей — зависит от приложения. Автор видел два примера, в которых использовался этот метод.

1. Пpoгрaммa преобразования IP-адресов в имена узлов. Каждый потребитель берет IP-адрес, вызывает gethostbyaddr (раздел 9.6 [24]), затем дописывает имя узла к файлу. Поскольку каждый вызов gethostbyaddr обрабатывается неопределенное время, порядок IP-адресов в буфере будет, скорее всего, отличаться от порядка имен узлов в файле, созданном потоками-потребителями. Преимущество этой схемы в параллельности выполнения вызовов gethostbyaddr (каждый из которых может работать несколько секунд) — по одному на каждый поток-потребитель.

ПРИМЕЧАНИЕ

Предполагается наличие версии gethostbyaddr, допускающей многократное вхождение, что не всегда верно. Если эта версия недоступна, можно хранить буфер в разделяемой памяти и использовать процессы вместо потоков. 

2. Программа, принимающая дейтаграммы UDP, обрабатывающая их и записывающая результат в базу данных. Каждая дeйтaгрaммa обрабатывается одним потоком-потребителем, которые выполняются параллельно для ускорения процесса. Хотя дейтаграммы записываются в базу данных в порядке, вообще говоря, отличном от порядка их приема, встроенная схема упорядочения записей в базе данных справляется с этой проблемой.

В листинге 10.15 приведены глобальные переменные программы.

Листинг 10.15. Глобальные переменные

//pxsem/prodcons4.с

1  #include "unpipc.h"

2  #define NBUFF 10

3  #define MAXNTHREADS 100

4  int nitems, nproducers, nconsumers; /* только для чтения */

5  struct { /* общие данные производителей и потребителей */

6   int buff[NBUFF];

7   int nput; /* номер объекта: 0, 1. 2, … */

8   int nputval; /* сохраняемое в buff[] значение */

9   int nget; /* номер объекта: 0, 1, 2, … */

10  int ngetval; /* получаемое из buff[] значение */

11  sem_t mutex, nempty, nstored; /* семафоры, а не указатели */

12 } shared;

13 void *produce(void *), *consume(void *);

Глобальные переменные и общая структура

4-12 Количество потоков-потребителей является глобальной переменной, устанавливаемой из командной строки. В структуру shared добавилось два новых поля: nget — номер следующего объекта, получаемого одним из потоков-потребителей, и ngetval — соответствующее значение.

Функция main, текст которой приведен в листинге 10.16, запускает несколько потоков-потребителей и потоков-производителей одновременно.

19-23 Новый аргумент командной строки указывает количество потоков-потребителей. Для хранения идентификаторов потоков-потребителей выделяется место под специальный массив (tid_consume), а для подсчета обработанных каждым потоком объектов выделяется массив conscount.

24-50 Создаются несколько потоков-производителей и потребителей, после чего основной поток ждет их завершения.

Листинг 10.16. Функция main для версии с несколькими производителями и потребителями

//pxsem/prodcons4.с

14 int

15 main(int argc, char **argv)

16 {

17  int i, prodcount[MAXNTHREADS], conscount[MAXNTHREADS];

18  pthread_t tid_produce[MAXNTHREADS], tid_consume[MAXNTHREADS];

19  if (argc != 4)

20   err_quit("usage: prodcons4 <#items> <#producers> <#consumers>");

21  nitems = atoi(argv[1]);

22  nproducers = min(atoi(argv[2]), MAXNTHREADS);

23  nconsumers = min(atoi(argv[3]), MAXNTHREADS);

24  /* инициализация трех семафоров */

25  Sem_init(&shared.mutex, 0, 1);

26  Sem_init(&shared.nempty, 0, NBUFF);

27  Sem_init(&shared.nstored, 0, 0);

28  /* создание производителей и потребителей */

29  Set_concurrency(nproducers + nconsumers);

30  for (i = 0; i < nproducers; i++) {

31   prodcount[i] = 0;

32   Pthread_create(&tid_produce[i], NULL, produce, &prodcount[i]);

33  }

34  for (i = 0; i < nconsumers; i++) {

35   conscount[i] = 0;

36   Pthread_create(&tid_consume[i], NULL, consume, &conscount[i]);

37  }

38  /* ожидание завершения всех производителей и потребителей */

39  for (i = 0; i < nproducers: i++) {

40   Pthread_join(tid_produce[i], NULL);

41   printf("producer count[%d] = %d ", i, prodcount[i]);

42  }

43  for (i = 0; i < nconsumers; i++) {

44   Pthread_join(tid_consume[i], NULL);

45   printf("consumer count[%d] = %d ", i, conscount[i]);

46  }

47  Sem_destroy(&shared.mutex);

48  Sem_destroy(&shared.nempty);

49  Sem_destroy(&shared.nstored);

50  exit(0);

51 }

Функция produce содержит одну новую строку по сравнению с листингом 10.13. В части кода, относящейся к завершению потока-производителя, появляется строка, отмеченная знаком +:

 if (shared.nput >= nitems) {

+ Sem_post(&shared.nstored); /* даем возможность потребителям завершить работу */

  Sem_post(&shared.nempty);

  Sem_post(&shared.mutex);

  return(NULL); /* готово */

 }

Снова нам нужно быть аккуратными при обработке завершения процессов-производителей и потребителей. После обработки всех объектов в буфере все потребители блокируются в вызове

Sem_wait(&shared.nstored); /* Ожидание помещения объекта в буфер */

Производителям приходится увеличивать семафор nstored для разблокирования потрeбитeлeй, чтобы они узнали, что работа завершена. Функция consume приведена в листинге 10.17. 

Листинг 10.17. Функция, выполняемая всеми потоками-потребителями

//pxsem/prodcons4.c

72 void *

73 consume(void *arg)

74 {

75  int i;

76  for (;;) {

77   Sem_wait(&shared.nstored); /* ожидание помещения объекта в буфер */

78   Sem_wait(&shared.mutex);

79   if (shared.nget >= nitems) {

80    Sem_post(&shared.nstored);

81    Sem_post(&shared.mutex);

82    return(NULL); /* готово */

83   }

84   i = shared.nget % NBUFF;

85   if (shared.buff[i] != shared.ngetval)

86    printf("error: buff[%d] = %d ", i, shared.buff[i]);

87   shared.nget++;

88   shared.ngetval++;

89   Sem_post(&shared.mutex);

90   Sem_post(&shared.nempty); /* освобождается место для элемента */

91   *((int *) arg) += 1;

92  }

93 }

Завершение потоков-потребителей

79-83 Функция consume сравнивает nget и nitems, чтобы узнать, когда следует остановиться (аналогично функции produce). Обработав последний объект в буфере, потоки-потребители блокируются, ожидая изменения семафора nstored. Когда завершается очередной поток-потребитель, он увеличивает семафор nstored, давая возможность завершить работу другому потоку-потребителю.

Данный текст является ознакомительным фрагментом.