10.6. Задача производителей и потребителей

We use cookies. Read the Privacy and Cookie Policy

10.6. Задача производителей и потребителей

В разделе 7.3 мы описали суть задачи производителей и потребителей и привели несколько возможных ее решений, в которых несколько потоков-производителей заполняли массив, который обрабатывался одним потоком-потребителем.

1. В нашем первом варианте решения (раздел 7.2) потребитель запускался только после завершения работы производителей, поэтому мы могли решить проблему синхронизации, используя единственное взаимное исключение для синхронизации производителей.

2. В следующем варианте решения (раздел 7.5) потребитель запускался до завершения работы производителей, поэтому требовалось использование взаимного исключения (для синхронизации производителей) вместе с условной переменной и еще одним взаимным исключением (для синхронизации потребителя с производителями).

Расширим постановку задачи производителей и потребителей, используя общий буфер в качестве циклического: заполнив последнее поле, производитель (buff[NBUFF-1]) возвращается к его началу и заполняет первое поле (buff[0]), и потребитель действует таким же образом. Возникает еще одно требование к синхронизации: потребитель не должен опережать производителя. Мы все еще предполагаем, что производитель и потребитель представляют собой отдельные потоки одного процесса, но они также могут быть и просто отдельными процессами, если мы сможем создать для них общий буфер (например, используя разделяемую память, часть 4).

При использовании общего буфера в качестве циклического код должен удовлетворять трем требованиям:

1. Потребитель не должен пытаться извлечь объект из буфера, если буфер пуст.

2. Производитель не должен пытаться поместить объект в буфер, если последний полон.

3. Состояние буфера может описываться общими переменными (индексами, счетчиками, указателями связных списков и т.д.), поэтому все операции с буфером, совершаемые потребителями и производителями, должны быть защищены от потенциально возможной ситуации гонок.

Наше решение использует три семафора:

1. Бинарный семафор с именем mutex защищает критические области кода: помещение данных в буфер (для производителя) и изъятие данных из буфера (для потребителя). Бинарный семафор, используемый в качестве взаимного исключения, инициализируется единицей. (Конечно, мы могли бы воспользоваться и обычным взаимным исключением вместо двоичного семафора. См. упражнение 10.10.)

2. Семафор-счетчик с именем nempty подсчитывает количество свободных полей в буфере. Он инициализируется значением, равным объему буфера (NBUFF).

3. Семафор-счетчик с именем nstored подсчитывает количество заполненных полей в буфере. Он инициализируется нулем, поскольку изначально буфер пуст.

Рис. 10.7. Состояние буфера и двух семафоров-счетчиков после инициализации

На рис. 10.7 показано состояние буфера и двух семафоров-счетчиков после завершения инициализации. Неиспользуемые элементы массива выделены темным.

В нашем примере производитель помещает в буфер целые числа от 0 до NLOOP-1 (buff[0] = 0, buff[1] = 1), работая с ним как с циклическим. Потребитель считывает эти числа и проверяет их правильность, выводя сообщения об ошибках в стандартный поток вывода.

На рис. 10.8 изображено состояние буфера и семафоров-счетчиков после помещения в буфер трех элементов, но до изъятия их потребителем.

Рис. 10.8. Буфер и семафоры после помещения в буфер трех элементов

Предположим, что потребитель изъял один элемент из буфера. Новое состояние изображено на рис. 10.9.

Рис. 10.9. Буфер и семафоры после удаления первого элемента из буфера

В листинге 10.8 приведен текст функции main, которая создает три семафора, запускает два потока, ожидает их завершения и удаляет семафоры.

Листинг 10.8. Функция main для решения задачи производителей и потребителей с помощью семафоров

//pxsem/prodcons1.с

1  #include "unpipc.h"

2  #define NBUFF 10

3  #define SEM_MUTEX "mutex" /* аргументы px_ipc_name() */

4  #define SEM_NEMPTY "nempty"

5  #define SEM_NSTORED "nstored"

6  int nitems; /* read-only для производителя и потребителя */

7  struct { /* разделяемые производителем и потребителем данные */

8   int buff[NBUFF];

9   sem_t *mutex, *nempty, *nstored;

10 } shared;

11 void *produce(void *), *consume(void *);

12 int

13 main(int argc, char **argv)

14 {

15  pthread_t tid_produce, tid_consume;

16  if (argc != 2)

17   err_quit("usage: prodcons1 <#items>");

18  nitems = atoi(argv[1]);

19  /* создание трех семафоров */

20  shared.mutex = Sem_open(Px_ipc_name(SEM_MUTEX), O_CREAT | O_EXCL,

21   FILE_MODE, 1);

22  shared.nempty = Sem_open(Px_ipc_name(SEM_NEMPTY), 0_CREAT | O_EXCL,

23   FILE_MODE, NBUFF);

24  shared.nstored = Sem_open(Px_ipc_name(SEM_NSTORED), O_CREAT | O_EXCL,

25   FILE_MODE, 0);

26  /* создание одного потока-производителя и одного потока-потребителя */

27  Set_concurrency(2);

28  Pthread_create(&tid_produce, NULL, produce, NULL);

29  Pthread_create(&tid_consume, NULL, consume, NULL);

30  /* ожидание завершения работы потоков */

31  Pthread_join(tid_produce, NULL);

32  Pthread_join(tid_consume, NULL);

33  /* удаление семафоров */

34  Sem_unlink(Px_ipc_name(SEM_MUTEX));

35  Sem_unlink(Px_ipc_name(SEM_NEMPTY));

36  Sem_unlink(Px_ipc_name(SEM_NSTORED));

37  exit(0);

38 }

Глобальные переменные

6-10 Потоки совместно используют буфер, содержащий NBUFF элементов, и три указателя на семафоры. Как говорилось в главе 7, мы объединяем эти данные в структуру, чтобы подчеркнуть, что семафоры используются для синхронизации доступа к буферу.

Создание семафоров

19-25 Мы создаем три семафора, передавая их имена функции px_ipc_name. Флаг O_EXCL мы указываем, для того чтобы гарантировать инициализацию каждого семафора правильным значением. Если после преждевременно завершенного предыдущего запуска программы остались неудаленные семафоры, мы обработаем эту ситуацию, вызвав перед их созданием sem_unlink и игнорируя ошибки. Мы могли бы проверять возвращение ошибки EEXIST при вызове sem_open с флагом O_EXCL, а затем вызывать sem_unlink и еще раз sem_open, но это усложнило бы программу. Если нам нужно проверить, что запущен только один экземпляр программы (что следует сделать перед созданием семафоров), можно обратиться к разделу 9.7, где описаны методы решения этой задачи.

Создание двух потоков

26-29 Создаются два потока, один из которых является производителем, а другой — потребителем. При запуске никакие аргументы им не передаются.

30-36 Главный поток ждет завершения работы производителя и потребителя, а затем удаляет три семафора.

ПРИМЕЧАНИЕ

Мы могли бы вызвать для каждого семафора sem_close, но это делается автоматически при завершении процесса. А вот удалить имя семафора из файловой системы необходимо явно.

В листинге 10.9 приведен текст функций produce и consume.

Листинг 10.9. Функции produce и consume

//pxsem/prodcons1.c

39 void *

40 produce(void *arg)

41 {

42  int i;

43  for (i = 0; i < nitems; i++) {

44   Sem_wait(shared.nempty); /* ожидаем освобождения поля */

45   Sem_wait(shared.mutex);

46   shared.buff[i % NBUFF] = i; /* помещаем i в циклический буфер */

47   Sem_post(shared.mutex);

48   Sem_post(shared.nstored); /* сохраняем еще 1 элемент */

49  }

50  return(NULL);

51 }

52 void *

53 consume(void *arg)

54 {

55  int i;

56  for (i = 0; i < nitems; i++) {

57   Sem_wait(shared.nstored); /* ожидаем появления объекта в буфере */

58   Sem_wait(shared.mutex);

59   if (shared.buff[i % NBUFF] != i)

60    printf("buff[%d] = %d ", i, shared.buff[i % NBUFF]);

61   Sem_post(shared.mutex);

62   Sem_post(shared.nempty); /* еще одно пустое поле */

63  }

64  return(NULL);

65 }

Производитель ожидает освобождения места в буфере

44 Производитель вызывает sem_wait для семафора nempty, ожидая появления свободного места. В первый раз при выполнении этой команды значение семафора nempty уменьшится с NBUFF до NBUFF-1.

Производитель помещает элемент в буфер

45-48 Перед помещением нового элемента в буфер производитель должен установить блокировку на семафор mutex. В нашем примере, где производитель просто сохраняет значение в элементе массива с индексом i % NBUFF, для описания состояния буфера не используется никаких разделяемых переменных (то есть мы не используем связный список, который нужно было бы обновлять каждый раз при помещении элемента в буфер). Следовательно, установка и снятие семафора mutex не являются обязательными. Тем не менее мы иллюстрируем эту технику, потому что обычно ее применение является необходимым в задачах такого рода (обновление буфера, разделяемого несколькими потоками).

После помещения элемента в буфер блокировка с семафора mutex снимается (его значение увеличивается с 0 до 1) и увеличивается значение семафора nstored. Первый раз при выполнении этой команды значение nstored изменится с начального значения 0 до 1.

Потребитель ожидает изменения семафора nstored

57-62 Если значение семафора nstored больше 0, в буфере имеются объекты для обработки. Потребитель изымает один элемент из буфера и проверяет правильность его значения, защищая буфер в момент доступа к нему с помощью семафора mutex. Затем потребитель увеличивает значение семафора nempty, указывая производителю на наличие свободных полей.

Данный текст является ознакомительным фрагментом.