Динамический пул потоков
Динамический пул потоков
Динамический пул потоков не является каким-то специфическим механизмом, продиктованным именно микроядерной архитектурой QNX. Это удачная искусственная конструкция, все определения которой размещены в файле <sys/dispatch.h>. Удивительно не то, что в составе API QNX имеется такой механизм, а то, что подобные инструменты отсутствуют в других ОС.
В предыдущих примерах кода мы неоднократно создавали наборы потоков для тех или иных целей, но всем им было присуще одно: общее количество потоков в них было фиксированным на момент создания. Это и были статические пулы потоков, разделяющих между собой работу приложения. Архитекторы QNX идут чуть дальше: они предоставляют инструментарий для создания пулов однотипных (с общей функцией потока) потоков, в которых конкретное число потоков может увеличиваться или уменьшаться синхронно с изменением нагрузки на приложение. Именно своим динамическим составом эта конструкция и отличается.
Динамический пул потоков нужен разработчикам QNX в первую очередь как инструмент построения многопоточных менеджеров ресурсов - основы построения сервисов ОС QNX. Но и помимо этой цели динамический пул потоков представляет собой мощнейшее средство для конструирования параллельных механизмов обработки.
Проиллюстрируем применение динамического пула потоков примером программного кода, который был нами описан в книге [4] в главе «Сервер TCP/IP... много серверов хороших и разных». По сути, это ретранслирующий TCP/IP-сервер, но сейчас это для нас неважно:
Сервер на базе динамического пула потоков
#include <pthread.h>
#include <sys/dispatch.h>
static int ls; // прослушивающий TCP-сокет
THREAD_POOL_PARAM_T* alloc(THREAD_POOL_HANDLE_T* h) {
return (THREAD_POOL_PARAM_T*)h;
}
// функция блокирования пула потоков
THREAD_POOL_PARAM_T* block(THREAD_POOL_PARAM_T* p) {
int rs = accept(ls, NULL, NULL);
if (rs < 0) errx("accept error");
return(THREAD_POOL_PARAM_T*)rs;
}
int handler(THREAD_POOL_PARAM_T* p) {
retrans((int)p);
close((int)p);
delay(250);
cout << pthread_self() << flush;
return 0;
}
int main(int argc, char* argv[]) {
// создать TCP-сокет на порт
ls = getsocket(THREAD_POOL_PORT);
// создание атрибутной записи пула потоков:
thread_pool_attr_t attr;
memset(&attr, 0, sizeof(thread_pool_attr_t));
// заполнение блока атрибутов пула
/* - mm число блокированных потоков в пуле */
attr.lo_water = 3;
/* - max число блокированных потоков в пуле */
attr.hi_water = 7;
/* - инкремент шага создания потоков */
attr.increment = 2;
attr.maximum = 9;
/* - общий предел числа потоков в пуле */
attr.handle = dispatch_create();
attr.context_alloc = alloc;
attr.block_func = block;
attr.handler_func = handler;
// фактическое создание пула потоков:
void* tpp = thread_pool_create(&attr, POOL_FLAG_USE_SELF);
if (tpp == NULL) errx("create pool");
// начало функционирования пула потоков:
thread_pool_start(tpp);
// ... выполнение никогда не дойдет до этой точки!
exit(EXIT_SUCCESS);
}
Примечание
В примере используются, но не определены две функции, которые не столь существенны для понимания примера сточки зрения функционирования пула:
• errx() — реакция на ошибку выполнения с выводом сообщения и последующим аварийным завершением;
• retrans() — прием сообщения с присоединенного TCP-сокета с последующей ретрансляцией полученного содержимого в него же.
Итак, первая особенность пула потоков в том, что мы построили многопоточный сервер, почти не прописывая собственного кода, — большую часть рутинной работы за нас сделала библиотека пула.
Приведем описание логики работы пула потоков и показанного примера на самом качественном, простейшем уровне:
• Первоначально (при запуске пула потоков в работу вызовом thread_pool_start()) создается attr.lo_water потоков («нижняя ватерлиния» числа блокированных потоков).
• При создании любого потока (как в процессе начального, так и в процессе последующего создания) вызывается функция attr.соntext_alloc() (в контексте созданного потока).
• По завершении функция вызывает блокирующую функцию потока attr.block_func(), на которой созданный поток ожидает события активизации (в показанном примере событие активизации — это установление соединения новым клиентом по возврату из accept()).
• Блокирующая функция после наступления события активизации переведет поток в состояние READY и вызовет в контексте этого потока функцию обработчика attr.handler_func().
• Если после предыдущего шага число оставшихся заблокированных потоков станет ниже attr.lo_water, механизм пула создаст дополнительно attr.increment потоков и «доведет» их до блокирующей функции.
• Активизированный поток производит всю обработку, предписанную функцией потока, и после выполнения потоковой функции будет опять переведен в блокированное состояние в функции блокирования…
• …но перед переводом потока вновь в блокированное состояние проверяется, не будет ли при этом превышено число блокированных потоков attr.hi_water («верхняя ватерлиния»), и если это имеет место, то поток вместо перевода в блокированное состояние самоуничтожается.
• Все проверки числа потоков производятся для того, чтобы общее число потоков пула (т. e. число активизированных потоков вместе с блокированными) не превышало общее ограничение attr.maximum.
Разобрав общую логику функционирования пула потоков, можно теперь детальнее рассмотреть отдельные шаги всего процесса:
1. Прежде чем создавать пул потоков, мы должны создать атрибутную запись, определяющую все поведение пула. Атрибутная запись описана так (<sys/dispatch.h>):
typedef struct _thread_pool_attr {
THREAD_POOL_HANDLE_T* handle;
THREAD_POOL_PARAM_T*
(*block_func)(THREAD_POOL_PARAM_T* ctp);
void (*unblock_func)(THREAD_POOL_PARAM_T* ctp);
int (*handler_func)(THREAD_POOL_PARAM_T* ctp);
THREAD_POOL_PARAM_T*
(*context_alloc)(THREAD_POOL_HANDLE_T* handle);
void (*context_free)(THREAD_POOL_PARAM_T* ctp);
pthread_attr_t* attr;
unsigned short lo_water;
unsigned short increment;
unsigned short hi_water;
unsigned short maximum;
unsigned reserved[8];
} thread_pool_attr_t;
Дескриптор создаваемого пула потоков handle, посредством которого мы будем ссылаться на пул, является просто синонимом типа dispatch_t:
#ifndef THREAD_POOL_HANDLE_T
#define THREAD_POOL_HANDLE_T dispatch_t
#endif
Атрибуты потоков, которые будут работать в составе пула, определяются полем attr типа pthread_attr_t (эту структуру мы детально рассматривали ранее при обсуждении создания единичных потоков).
Численные параметры пула определяют:
lo_water — «нижняя ватерлиния», минимальное число потоков пула, находящихся в блокированном состоянии (в ожидании активизации). Если в результате некоторого события один из ожидающих потоков переходит в состояние активной обработки и число оставшихся блокированных потоков становится меньше lo_water, создается дополнительно increment потоков, которые переводятся в блокированное состояние.
hi_water — максимальное число потоков, которые допустимо иметь в блокированном состоянии. Если после завершения обработки некоторым потоком число заблокированных потоков становится больше hi_water, то этот поток уничтожается.
maximum — общая верхняя граница числа потоков пула (активизированных и заблокированных). Даже если число заблокированных потоков (в пике активности) станет ниже lo_water, но общее число потоков уже достигнет maximum, то новые потоки для пула создаваться не будут.
Функциональные параметры пула определяют:
context_alloc() и context_free() — функции создания и уничтожения контекста потока, которые вызываются при создании и уничтожении каждого потока пула. Функция создания контекста потока ответственна за индивидуальные настройки создаваемого потока. Она возвращает «указатель на контекст» типа THREAD_POOL_PARAM_T. Однако системе такой тип неизвестен:
#ifndef THREAD_POOL_PARAM_T
#define THREAD_POOL_PARAM_T void
#endif
В качестве контекста может использоваться любой пользовательский тип, и он будет передаваться последовательно в качестве параметра (ctp) во все последующие функции обслуживания потока.
block_func() — функция блокирования, которая вызывается в потоке сразу же после context_alloc() или после очередного этапа выполнения потоком функции обработчика handler_func(). Функция блокирования получает и возвращает далее обработчику (возможно, после модификации) структуру контекста (в приведенном выше примере контекстом является int — значение присоединенного TCP-сокета).
handler_func() — это, собственно, и есть аналог потоковой функции, в которой выполняется вся полезная работа потока. Функция вызывается библиотекой после выхода потока из блокирующей функции block_func(), при этом функция-обработчик handler_func() получит параметр контекста, возвращенный block_func().
Примечание
В текущей реализации handler_func() должна возвращать 0; все другие значения зарезервированы для дальнейших расширений. Аналогично определенная в атрибутной записи функция unblock_func() зарезервирована для дальнейших расширений, и вместо ее адреса следует устанавливать NULL.
2. После создания атрибутной записи пула, определяющей всю функциональность его дальнейшего поведения, можно приступать к непосредственному созданию пула потоков:
thread_pool_t* thread_pool_create(
thread_pool_attr_t* attr, unsigned flags);
где attr — подробно рассмотренная (и созданная) ранее атрибутная запись пула;
flags — флаг, определяющий поведение вызывающего потока после последующего вызова thread_pool_start(). В документации описано два возможных значения флага:
• POOL_FLAG_EXIT_SELF — после старта пула поток, вызвавший thread_pool_start() (часто это главный поток приложения), завершается;
• POOL_FLAG_USE_SELF — после старта пула поток, вызвавший thread_pool_start(), включается в пул в качестве одного из его потоков.
И в том и в другом случае в типовом фрагменте (как и в показанном выше примере):
thread_pool_start(tpp);
exit(EXIT_SUCCESS);
управление никогда не дойдет до выполнения exit(). Но существует еще третье допустимое значение, прямо не указанное в документации, но мельком упоминаемое в других местах документации:
• 0 — после старта пула поток, вызвавший thread_pool_start(), продолжает свое естественное выполнение.
Например, некоторый фрагмент кода мог бы выглядеть так:
thread_pool_attr_t att; // ...
thread_pool_t *tpp = thread_pool_create(&attr, 0);
thread_pool_start(tpp);
while (true) {
// выполнять некоторую отличную от пула работу
}
exit(EXIT_SUCCESS);
Как уже понятно из описаний, thread_pool_create() возвращает указатель на управляющую структуру пула потоков, которая позже будет передана thread_pool_start(). Если создание пула завершилось неудачей, то результатом выполнения будет NULL, а в errno будет установлен код ошибки (документацией предусмотрен только один код ошибки: ENOMEM — недостаточно памяти для размещения структур данных).
Примечание
Управляющая структура пула потоков описана так:
typedef struct _thread_pool thread_pool_t;
struct _thread_pool {
thread_pool_attr_t pool_attr;
unsigned created;
unsigned waiting;
unsigned flags;
unsigned reserved[3];
};
3. Последний шаг в процедуре запуска пула потоков:
int thread_pool_start(void* pool);
где pool — это указатель, возвращаемый thread_pool_create().[40]
При успешном завершении (которого почти никогда не происходит, за исключением значения флага 0; об этом см. выше) функция возвращает EOK, в противном случае (что происходит гораздо чаще) — значение -1.
4. Другие, относящиеся к библиотеке динамического пула потоков функции, которые целесообразно посмотреть в документации QNX (но которые в силу различных обстоятельств используются гораздо реже):
int thread_pool_destroy(thread_pool_t* pool);
int thread_pool_control(thread_pool_t* pool, thread_pool_attr_t* attr,
_Uint16t lower, _Uint16t upper, unsigned flags);
int thread_pool_limits(thread_pool_t* pool,
int lowater, int hiwater, int maximum, int increment, unsigned flags);
Более 800 000 книг и аудиокниг! 📚
Получи 2 месяца Литрес Подписки в подарок и наслаждайся неограниченным чтением
ПОЛУЧИТЬ ПОДАРОКДанный текст является ознакомительным фрагментом.
Читайте также
Глава 10 Введение в динамический HTML
Глава 10 Введение в динамический HTML 10.1. Браузер и HTML-документ10.2. Родительские и дочерние объекты10.3. Объекты браузера10.4. Объектная модель документа (DOM)Язык HTML позволяет создавать только статические веб-страницы, не обеспечивающие интерактивное взаимодействие с
13.3. Динамический перевод сообщений программ
13.3. Динамический перевод сообщений программ Только что освещенные интерфейсы стандартной библиотеки С решают простые части проблемы локализации. Для денежных, числовых значений, значений времени и даты, также, как для проблем сортировки строк, применяется управление
Стеки потоков и допустимые количества потоков
Стеки потоков и допустимые количества потоков Следует сделать еще два предостережения. Во-первых, подумайте о размере стека, который по умолчанию составляет 1 Мбайт. В большинстве случаев этого будет вполне достаточно, но если существуют какие-либо сомнения на сей счет,
Динамический вызов в сравнении со статическим
Динамический вызов в сравнении со статическим До сих пор говорилось о том, что СОМ основан на клиентских программах, имеющих на этапе разработки предварительную информацию об определении интерфейса. Это достигается либо через заголовочные файлы C++ (для клиентов C++), либо
Динамический ввод координат
Динамический ввод координат С помощью функции динамического ввода значения координат можно вводить не в командной строке, а в поле всплывающей подсказки, которая отображается рядом с курсором и динамически обновляется по мере перемещения курсора. Функция
Динамический блок
Динамический блок Для обеспечения регулировки состояния блока по месту его расположения создаются динамические блоки. Они определяются путем указания настраиваемых свойств. Динамический блок должен содержать хотя бы один параметр и одну связанную с ним операцию.
Динамический ввод данных
Динамический ввод данных В ранних версиях программы (до AutoCAD 2006) все запросы и приглашения для ввода данных можно было увидеть только в командной строке. По этой причине при работе с программой приходилось постоянно переводить взгляд с графической области на командную
Динамический ввод координат
Динамический ввод координат С помощью функции динамического ввода значения координат можно вводить не в командной строке, а в поле всплывающей подсказки, которая отображается рядом с курсором и динамически обновляется по мере перемещения курсора. Функция
Динамический блок
Динамический блок Для обеспечения регулировки состояния блока по месту его расположения создаются динамические блоки. Они определяются путем указания настраиваемых свойств. Динамический блок должен содержать хотя бы один параметр и одну связанную с ним операцию.
Динамический в сравнении со статическим SQL
Динамический в сравнении со статическим SQL Операторы SQL, включенные в код и обработанные препроцессором, иногда называются статическим SQL. В отличие от них операторы, которые генерируются клиентским приложением и передаются для выполнения на сервер во время работы,
Динамический ввод координат
Динамический ввод координат С помощью функции динамического ввода значения координат можно вводить не в командной строке, а в поле всплывающей подсказки, которая отображается рядом с курсором и динамически обновляется по мере перемещения курсора. Функция
Динамический блок
Динамический блок Для обеспечения регулировки состояния блока по месту его расположения создаются динамические блоки. Они определяются путем указания настраиваемых свойств. Динамический блок должен содержать хотя бы один параметр и одну связанную с ним операцию.
Динамический ввод данных
Динамический ввод данных В более ранних версиях программы (до AutoCAD 2006) все запросы и приглашения для ввода данных можно было увидеть только в командной строке. По этой причине при работе с программой приходилось постоянно переводить взгляд с графической области на
Динамический диапазон матрицы
Динамический диапазон матрицы Динамический диапазон светочувствительной матрицы – это ее способность воспринимать градации каждого из цветов. Говоря проще, динамический диапазон определяет, сколько ступеней разности контраста может увидеть и зафиксировать матрица.
Статический тип, динамический тип
Статический тип, динамический тип Название последнего свойства предполагает различение "статического типа" и "динамического типа". Тип, который используется при объявлении некоторого элемента, является статическим типом соответствующей ссылки. Если во время выполнения