Объект очереди

Объект очереди

До сих пор мы связывали с каждым мьютексом только одно событие, но в общем случае могут существовать несколько предикатов переменных условий. Например, в случае очереди, действующей по принципу "первым пришел, первым ушел" (first in first out, FIFO), поток, который пытается удалить элемент из очереди, должен дождаться события, указывающего на то, что очередь не является пустой, а поток, помещающий элемент в очередь, должен дождаться события, указывающего на то, что очередь не является заполненной. Решение заключается в предоставлении двух событий — по одному для каждого условия.

В программе 10.3 представлены необходимые объявления объекта очереди и его функций. В объявлениях намеренно применяется стиль, отличающийся от того, который принят в Windows и который мы использовали до сих пор. Эта программа была получена преобразованием ее первоначального варианта, реализованного в UNIX на основе потоков Pthreads, чем и объясняется происхождение использованного нами стиля. Точно так же и вы можете наследовать тот или иной стиль или определить собственный, который соответствует вашему вкусу или принятым в вашей организации требованиям. В упражнении 10.7 вам предлагается преобразовать приведенный стиль к стилю Windows.

Программы 10.4 и 10.5 представляют функции очереди и программу, которая их использует.

Программа 10.3. SynchObj.h: часть 2 — объявления объекта очереди 

/* Объявления структуры обычной ограниченной синхронизированной очереди.*/

/* Очереди закольцованы и реализованы в виде массивов с индексацией */

/* последнего и первого сообщений. */

/* Кроме того, каждая очередь содержит защитный мьютекс и */

/* переменные условий "очередь не пуста" и "очередь не заполнена". */

/* Наконец, имеется указатель массива сообщений произвольного типа. */

typedef struct queue_tag { /* Универсальная очередь. */

 HANDLE q_guard; /* Защита блока сообщения. */

 HANDLE q_ne; /* Очередь не пуста. Вручную сбрасываемое событие. (Автоматически сбрасываемое событие для "сигнальной модели".) */

 HANDLE q_nf; /* Очередь не заполнена. Вручную сбрасываемое событие. (Автоматически сбрасываемое событие для "сигнальной модели".) */

 volatile DWORD q_size; /* Максимальный размер очереди. */

 volatile DWORD q_first; /* Индекс первого сообщения. */

 volatile DWORD q_last; /* Индекс последнего сообщения. */

 volatile DWORD q_destroyed; /* Получатель сообщений очереди завершил выполнение. */

 PVOID msg_array; /* Массив q_size сообщений. */

} queue_t;

/* Функции управления очередью. */

DWORD q_initialize(queue_t *, DWORD, DWORD);

DWORD q_destroy(queue_t *);

DWORD q_destroyed(queue_t *);

DWORD q_empty(queue_t *);

DWORD q_full(queue_t *);

DWORD q_get(queue_t *, PVOID, DWORD, DWORD);

DWORD q_put(queue_t *, PVOID, DWORD, DWORD);

DWORD q_remove(queue_t *, PVOID, DWORD);

DWORD q_insert(queue_t *, PVOID, DWORD); 

В программе 10.4 представлены такие функции, как q_initialize и q_get, прототипы которых описаны в конце программы 10.3. Обратите внимание, что функции q_get и q_put обеспечивают синхронизацию доступа, а функции q_remove и q_insert, которые вызываются первыми двумя функциями, сами по себе не являются синхронизированными и могут быть использованы в однонитевых программах. В первых двух функциях предусмотрена возможность использования конечных интервалов ожидания, что требует незначительного расширения модели переменных условий.

q_empty и q_full — две другие важные функции, которые используются для реализации предикатов переменных условий.

Данная реализация использует функцию PulseEvent и вручную сбрасываемые события (широковещательная модель), так что все события уведомляются о том, что очередь не пуста или не заполнена.

Замечательной особенностью этой реализации является симметрия функций q_get и q_put. Обратите внимание хотя бы на то, как в этих функциях используются предикаты пустой и заполненной очередей или события. Подобная простота не только восхитительна сама по себе, но и имеет благоприятные практические последствия, облегчающие написание, понимание и сопровождение программы, и все это было достигнуто за счет использования модели переменных условий. 

Наконец, те, кто программирует на C++, легко сообразят, что приведенный код может быть использован для создания класса синхронизированной очереди; именно это вам и предлагается сделать в упражнении 10.8.

Программа 10.4. QueueObj.с: функции управления очередью 

/* Глава 10. QueueObj.c. */

/* Функции очереди */

#include "EvryThng.h"

#include "SynchObj.h"

/* Функции управления конечной ограниченной очередью. */

DWORD q_get(queue_t *q, PVOID msg, DWORD msize, DWORD MaxWait) {

 if (q_destroyed (q)) return 1;

 WaitForSingleObject(q->q_guard, INFINITE);

 while (q_empty(q)) {

  SignalObjectAndWait(q->q_guard, q->q_ne, INFINITE, FALSE);

  WaitForSingleObject(q->q_guard, INFINITE);

 }

 /* Удалить сообщение из очереди. */

 q_remove(q, msg, msize);

 /* Сигнализировать о том, что очередь не заполнена, поскольку мы удалили сообщение. */

 PulseEvent(q->q_nf);

 ReleaseMutex(q->q_guard);

 return 0;

}

DWORD q_put(queue_t *q, PVOID msg, DWORD msize, DWORD MaxWait) {

 if (q_destroyed(q)) return 1;

 WaitForSingleObject(q->q_guard, INFINITE);

 while(q_full(q)) {

  SignalObjectAndWait(q->q_guard, q->q_nf, INFINITE, FALSE);

  WaitForSingleObject(q->q_guard, INFINITE);

 }

 /* Поместить сообщение в очередь. */

 q_insert(q, msg, msize);

 /* Сигнализировать о том, что очередь не пуста; мы вставили сообщение.*/

 PulseEvent (q->q_ne);

 /* Широковещательная модель CV. */

 ReleaseMutex(q->q_guard);

 return 0;

}

DWORD q_initialize(queue_t *q, DWORD msize, DWORD nmsgs) {

 /* Инициализация очереди, включая ее мьютекс и события. */

 /* Выделить память для всех сообщений. */

 q->q_first = q->q_last = 0;

 q->q_size = nmsgs;

 q->q_destroyed = 0;

 q->q_guard = CreateMutex(NULL, FALSE, NULL); 

 q->q_ne = CreateEvent(NULL, TRUE, FALSE, NULL);

 q->q_nf = CreateEvent(NULL, TRUE, FALSE, NULL);

 if ((q->msg_array = calloc(nmsgs, msize)) == NULL) return 1;

 return 0; /* Ошибки отсутствуют. */

}

DWORD q_destroy(queue_t *q) {

 if (q_destroyed(q)) return 1;

 /* Освободить все ресурсы, созданные вызовом q_initialize. */

 WaitForSingleObject(q->q_guard, INFINITE);

 q->q_destroyed = 1;

 free(q->msg_array);

 CloseHandle(q->q_ne);

 CloseHandle(q->q_nf);

 ReleaseMutex(q->q_guard);

 CloseHandle(q->q_guard);

 return 0;

}

DWORD q_destroyed(queue_t *q) {

 return (q->q_destroyed);

}

DWORD q_empty(queue_t *q) {

 return (q->q_first == q->q_last);

}

DWORD q_full(queue_t *q) {

 return ((q->q_last – q->q_first) == 1 || (q->q_first == q->q_size-l && q->q_last == 0));

}

DWORD q_remove(queue_t *q, PVOID msg, DWORD msize) {

 char *pm;

 pm = (char *)q->msg_array;

 /* Удалить наиболее давнее ("первое") сообщение. */

 memcpy(msg, pm + (q->q_first * msize), msize);

 q->q_first = ((q->q_first + 1) % q->q_size);

 return 0; /* Ошибки отсутствуют. */

}

DWORD q_insert(queue_t *q, PVOID msg, DWORD msize) {

 char *pm;

 pm = (char *)q->msg_array;

 /* Добавить новое ("последнее") сообщение. */

 if (q_full(q)) return 1; /* Ошибка – очередь заполнена. */

 memcpy(pm + (q->q_last * msize), msg, msize);

 q->q_last = ((q->q_last + 1) % q->q_size);

 return 0;