Пример: объект порогового барьера

Пример: объект порогового барьера

Предположим, вам необходимо, чтобы рабочие потоки оставались в состоянии ожидания и не выполнялись до тех пор, пока количество таких потоков не станет достаточным для образования рабочей группы, способной выполнить нужную работу. Как только количество потоков достигает порогового значения, все ожидающие рабочие потоки начинают выполняться, а появляющиеся впоследствии дополнительные рабочие потоки будут выполняться без ожидания. Эту задачу можно решить путем создания сложного объекта порогового барьера (threshold barrier compound object).

В программах 10.1 и 10.2 представлена реализация трех функций, поддерживающих сложный объект барьера. Две из этих функций, CreateThresholdBarrier и CloseThresholdBarrier, управляют переменными THB_HANDLE, аналогичными дескрипторам, которые на протяжении всего времени применялись нами вместе с объектами ядра. Пороговое количество потоков является параметром функции CreateThresholdBarrier. 

Программа 10.1 представляет соответствующую часть заголовочного файла, SynchObj.h, тогда как программа 10.2 — реализацию трех упомянутых функций. Обратите внимание, что объект барьера содержит мьютекс, событие, счетчик и пороговое значение. Предикат переменной условия документирован в заголовочном файле, а именно, событие должно устанавливаться только тогда, когда значение счетчика достигает или становится больше порогового значения.

Программа 10.1. SynchObj.h: часть 1 — объявления объекта порогового барьера 

/* Глава 10. Сложные объекты синхронизации. */

#define CV_TIMEOUT 50 /* Настраиваемый параметр для модели CV. */

/* ОБЪЕКТ ПОРОГОВОГО БАРЬЕРА — ОПРЕДЕЛЕНИЕ ТИПА И ПРОТОТИПЫ ФУНКЦИЙ. */

typedef struct THRESHOLD_BARRIER_TAG { /* Пороговый барьер. */

 HANDLE b_guard; /* Мьютекс для объекта. */

 HANDLE b_broadcast; /* Вручную сбрасываемое событие: b_count >= b_threshold.*/

 volatile DWORD b_destroyed; /* Установить после закрытия. */

 volatile DWORD b_count; /* Количество потоков до достижения барьера. */

 volatile DWORD b_threshold; /* Пороговый барьер. */

} THRESHOLD_BARRIER, *THB_HANDLE;

/* Коды ошибок. */

#define SYNCH_OBJ_NOMEM 1 /* Невозможно выделить ресурсы. */

#define SYNCH_OBJ_BUSY 2 /* Объект используется и не может быть закрыт. */

#define SYNCH_OBJ_INVALID 3 /* Объект более не является действительным. */

DWORD CreateThresholdBarrier(THB_HANDLE *, DWORD /* Порог. */);

DWORD WaitThresholdBarrier(THB_HANDLE);

DWORD CloseThresholdBarrier(THB_HANDLE); 

Рассмотрим теперь предложенную в программе 10.2 реализацию трех функций. На Web-сайте книги находится тестовая программа testTHB. Обратите внимание на уже знакомый вам цикл проверки переменной условия в функции ожидания WaitThresholdBarrier. Кроме того, эта функция не только ожидает наступления события, но и переводит объект события в сигнальное состояние с помощью функции PulseEvent. Предыдущее обсуждение модели "производитель/потребитель" предполагало использование отдельных функций потоков.

Наконец, в данном случае предикат переменной условия обладает последействием. Как только условие выполнилось, оно будет выполняться и в дальнейшем, что исключает возможность перевода объекта события в сигнальное состояние более одного раза.

Программа 10.2. ThbObject.с: реализация объекта порогового барьера 

/* Глава 10. Программа 10.2. */

/* Библиотека сложных объектов синхронизации на основе порогового барьера.*/

#include "EvryThng.h"

#include "synchobj.h" 

/**********************************/

/*   ОБЪЕКТЫ ПОРОГОВОГО БАРЬЕРА   */

/**********************************/

DWORD CreateThresholdBarrier(THB_HANDLE *pthb, DWORD b_value) {

 THB_HANDLE hthb;

 /* Инициализация объекта барьера. Вариант программы с полной проверкой ошибок находится на Web-сайте. */

 hthb = malloc(sizeof(THRESHOLD_BARRIER));

 hthb->b_guard = CreateMutex(NULL, FALSE, NULL);

 hthb->b_broadcast = CreateEvent(NULL, FALSE /* Автоматически сбрасываемое событие. */, FALSE, NULL);

 hthb->b_threshold = b_value;

 hthb->b_count = 0;

 hthb->b_destroyed = 0;

 *pthb = hthb;

 return 0;

}

DWORD WaitThresholdBarrier(THB_HANDLE thb) {

 /* Ожидать, пока заданное количество потоков не достигнет порога, а затем установить событие. */

 if (thb->b_destroyed == 1) return SYNCH_OBJ_INVALID;

 WaitForSingleObject(thb->b_guard, INFINITE);

 thb->b_count++; /* Появился новый поток. */

 while (thb->b_count < thb->b_threshold) {

  SignalObjectAndWait(thb->b_guard, thb->b_broadcast, INFINITE, FALSE);

  WaitForSingleObject(thb->b_guard, INFINITE);

 }

 PulseEvent(thb->b_broadcast) ;

 /* Широковещательная модель CV, освобождение всех ожидающих потоков. */

 ReleaseMutex(thb->b_guard);

 return 0;

}

DWORD CloseThresholdBarrier(THB_HANDLE thb) {

 /* Уничтожить мьютекс и событие объекта барьера. */

 /* Убедиться в отсутствии потоков, ожидающих объект. */

 if (thb->b_destroyed == 1) return SYNCH_OBJ_INVALID;

 WaitForSingleObject(thb->b_guard, INFINITE);

 if (thb->b_count < thb->b_threshold) {

  ReleaseMutex(thb->b_guard);

  return SYNCH_OBJ_BUSY;

 }

 ReleaseMutex(thb->b_guard);

 CloseHandle(thb->b_guard);

 CloseHandle(thb->b_broadcast);

 free(thb);

 return 0;

}