7.3. Схема производитель-потребитель

7.3. Схема производитель-потребитель

Одна из классических задач на синхронизацию называется задачей производителя и потребителя. Она также известна как задача ограниченного буфера. Один или несколько производителей (потоков или процессов) создают данные, которые обрабатываются одним или несколькими потребителями. Эти данные передаются между производителями и потребителями с помощью одной из форм IPC.

С этой задачей мы регулярно сталкиваемся при использовании каналов Unix. Команда интерпретатора, использующая канал

grep pattern chapters.* | wc -l

является примером такой задачи. Программа grep выступает как производитель (единственный), a wc — как потребитель (тоже единственный). Канал используется как форма IPC. Требуемая синхронизация между производителем и потребителем обеспечивается ядром, обрабатывающим команды write производителя и read покупателя. Если производитель опережает потребителя (канал переполняется), ядро приостанавливает производителя при вызове write, пока в канале не появится место. Если потребитель опережает производителя (канал опустошается), ядро приостанавливает потребителя при вызове read, пока в канале не появятся данные.

Такой тип синхронизации называется неявным; производитель и потребитель не знают о том, что синхронизация вообще осуществляется. Если бы мы использовали очередь сообщений Posix или System V в качестве средства IPC между производителем и потребителем, ядро снова взяло бы на себя обеспечение синхронизации.

При использовании разделяемой памяти как средства IPC производителя и потребителя, однако, требуется использование какого-либо вида явной синхронизации. Мы продемонстрируем это на использовании взаимного исключения. Схема рассматриваемого примера изображена на рис. 7.1.

В одном процессе у нас имеется несколько потоков-производителей и один поток-потребитель. Целочисленный массив buff содержит производимые и потребляемые данные (данные совместного пользования). Для простоты производители просто устанавливают значение buff[0] в 0, buff [1] в 1 и т.д. Потребитель перебирает элементы массива, проверяя правильность записей.

В этом первом примере мы концентрируем внимание на синхронизации между отдельными потоками-производителями. Поток-потребитель не будет запущен, пока все производители не завершат свою работу. В листинге 7.1 приведена функция main нашего примера.

Рис. 7.1. Производители и потребитель

Листинг 7.1.[1] Функция main

//mutex/prodcons2.с

1  #include "unpipc.h"

2  #define MAXNITEMS 1000000

3  #define MAXNTHREADS 100

4  int nitems; /* только для чтения потребителем и производителем */

5  struct {

6   pthread_mutex_t mutex;

7   int buff[MAXNITEMS];

8   int nput;

9   int nval;

10 } shared = {

11  PTHREAD_MUTEX_INITIALIZER

12 };

13 void *produce(void *), *consume(void *);

14 int

15 main(int argc, char **argv)

16 {

17  int i, nthreads, count[MAXNTHREADS];

18  pthread_t tid_produce[MAXNTHREADS], tid_consume;

19  if (argc != 3)

20   err_quit("usage: prodcons2 <#items> <#threads>");

21  nitems = min(atoi(argv[1]), MAXNITEMS);

22  nthreads = min(atoi(argv[2]), MAXNTHREADS);

23  Set_concurrency(nthreads);

24  /* запуск всех потоков-производителей */

25  for (i = 0; i < nthreads; i++) {

26   count[i] = 0;

27   Pthread_create(&tid_produce[i], NULL, produce, &count[i]);

28  }

29  /* ожидание завершения всех производителей */

30  for (i = 0; i < nthreads; i++) {

31   Pthread_join(tid_produce[i], NULL);

32   printf("count[%d] = %d ", i, count[i]);

33  }

34  /* запуск и ожидание завершения потока-потребителя */

35  Pthread_create(&tid_consume, NULL, consume, NULL);

36  Pthread_join(tid_consume, NULL);

37  exit(0);

38 }

Совместное использование глобальных переменных потоками

4-12 Эти переменные совместно используются потоками. Мы объединяем их в структуру с именем shared вместе с взаимным исключением, чтобы подчеркнуть, что доступ к ним можно получить только вместе с ним. Переменная nput хранит индекс следующего элемента массива buff, подлежащего обработке, a nval содержит следующее значение, которое должно быть в него помещено (0, 1, 2 и т.д.). Мы выделяем память под эту структуру и инициализируем взаимное исключение, используемое для синхронизации потоков-производителей.

ПРИМЕЧАНИЕ

Мы всегда будем стараться размещать совместно используемые данные вместе со средствами синхронизации, к ним относящимися (взаимными исключениями, условными переменными, семафорами), в одной структуре, как мы сделали в этом примере. Это хороший стиль программирования. Однако во многих случаях совместно используемые данные являются динамическими, представляя собой, например, связный список. Мы, наверное, сможем поместить в структуру первый элемент списка вместе со средствами синхронизации (как в структуре mq_hdr в листинге 5.16), но оставшаяся часть списка в структуру не попадет. Следовательно, это решение не всегда является идеальным.

Аргументы командной строки

19-22 Первый аргумент командной строки указывает количество элементов, которые будут произведены производителями, а второй — количество запускаемых потоков-производителей.

Установка уровня параллельности

23 Функция set_concurrency (наша собственная) указывает подсистеме потоков количество одновременно выполняемых потоков. В Solaris 2.6 она просто вызывает thr_setconcurrency, причем ее запуск необходим, если мы хотим, чтобы у нескольких процессов-производителей была возможность начать выполняться. Если мы не сделаем этого вызова в системе Solaris, будет запущен только первый поток. В Digital Unix 4.0B наша функция set_concurrency не делает ничего, поскольку в этой системе по умолчанию все потоки процесса имеют равные права на вычислительные ресурсы.

ПРИМЕЧАНИЕ

Unix 98 требует наличия функции pthread_setconcurrency, выполняющей это же действие. Эта функция требуется для тех реализаций, которые мультиплексируют пользовательские потоки (создаваемые функцией pthread_create) на небольшое множество выполняемых потоков ядра. Такие реализации часто называются «многие-к-немногим» (many-to-few), «двухуровневые» (two-level) или «М-на-N» (M-to-N). В разделе 5.6 книги [3] отношения между пользовательскими потоками и потоками ядра рассматриваются более подробно.

Создание процессов-производителей

24-28 Создаются потоки-производители, каждый из которых вызывает функцию produce. Идентификаторы потоков хранятся в массиве tid_produce. Аргументом каждого потока-производителя является указатель на элемент массива count. Счетчики инициализируются значением 0, и каждый поток увеличивает значение своего счетчика на 1 при помещении очередного элемента в буфер. Содержимое массива счетчиков затем выводится на экран, так что мы можем узнать, сколько элементов было помещено в буфер каждым из потоков.

Ожидание завершения работы производителей, запуск потребителя

29-36 Мы ожидаем завершения работы всех потоков-производителей, выводя содержимое счетчика для каждого потока, а затем запускаем единственный процесс-потребитель. Таким образом (на данный момент) мы исключаем необходимость синхронизации между потребителем и производителями. Мы ждем завершения работы потребителя, а затем завершаем работу процесса. В листинге 7.2 приведен текст функций produce и consume.

Листинг 7.2. Функции produce и consume

//mutex/prodcons2.с

39 void *

40 produce(void *arg)

41 {

42  for (;;) {

43   Pthread_mutex_lock(&shared.mutex);

44   if (shared.nput >= nitems) {

45    Pthread_mutex_unlock(&shared.mutex);

46    return(NULL); /* массив полный, готово */

47   }

48   shared.buff[shared.nput] = shared.nval;

49   shared.nput++;

50   shared.nval++;

51   Pthread_mutex_unlock(&shared.mutex);

52   *((int *) arg) += 1;

53  }

54 }

55 void *

56 consume(void *arg)

57 {

58  int i;

59  for (i = 0; i < nitems; i++) {

60   if (shared.buff[i] != i)

61    printf("buff[%d] = %d ", i, shared.buff[i]);

62  }

63  return(NULL);

64 }

Формирование данных

42-53 Критическая область кода производителя состоит из проверки на достижение конца массива (завершение работы)

if (shared.nput >= nitems)

и трех строк, помещающих очередное значение в массив:

shared.buff[shared.nput] = shared.nval;

shared.nput++;

shared.nval++;

Мы защищаем эту область с помощью взаимного исключения, не забыв разблокировать его после завершения работы. Обратите внимание, что увеличение элемента count (через указатель arg) не относится к критической области, поскольку у каждого потока счетчик свой (массив count в функции main). Поэтому мы не включаем эту строку в блокируемую взаимным исключением область. Один из принципов хорошего стиля программирования заключается в минимизации объема кода, защищаемого взаимным исключением.

Потребитель проверяет содержимое массива

59-62 Потребитель проверяет правильность значений всех элементов массива и выводит сообщение в случае обнаружения ошибки. Как уже говорилось, эта функция запускается в единственном экземпляре и только после того, как все потоки-производители завершат свою работу, так что надобность в синхронизации отсутствует.

При запуске только что описанной программы с пятью процессами-производителями, которые должны вместе создать один миллион элементов данных, мы получим следующий результат:

solaris % prodcons2 1000000 5

count[0] = 167165

count[1] = 249891

count[2] = 194221

count[3] = 191815

count[4] = 196908

Как мы отмечали ранее, если убрать вызов set_concurrency, в системе Solaris 2.6 значение count[0] будет 1000000, а все остальные счетчики будут нулевыми.

Если убрать из этого примера блокировку с помощью взаимного исключения, он перестанет работать, как и предполагается. Потребитель обнаружит множество элементов buff[i], значения которых будут отличны от i. Также мы можем убедиться, что удаление блокировки ничего не изменит, если будет выполняться только один поток.

Данный текст является ознакомительным фрагментом.