Динамический пул потоков

We use cookies. Read the Privacy and Cookie Policy

Динамический пул потоков

Динамический пул потоков не является каким-то специфическим механизмом, продиктованным именно микроядерной архитектурой QNX. Это удачная искусственная конструкция, все определения которой размещены в файле <sys/dispatch.h>. Удивительно не то, что в составе API QNX имеется такой механизм, а то, что подобные инструменты отсутствуют в других ОС.

В предыдущих примерах кода мы неоднократно создавали наборы потоков для тех или иных целей, но всем им было присуще одно: общее количество потоков в них было фиксированным на момент создания. Это и были статические пулы потоков, разделяющих между собой работу приложения. Архитекторы QNX идут чуть дальше: они предоставляют инструментарий для создания пулов однотипных (с общей функцией потока) потоков, в которых конкретное число потоков может увеличиваться или уменьшаться синхронно с изменением нагрузки на приложение. Именно своим динамическим составом эта конструкция и отличается.

Динамический пул потоков нужен разработчикам QNX в первую очередь как инструмент построения многопоточных менеджеров ресурсов - основы построения сервисов ОС QNX. Но и помимо этой цели динамический пул потоков представляет собой мощнейшее средство для конструирования параллельных механизмов обработки.

Проиллюстрируем применение динамического пула потоков примером программного кода, который был нами описан в книге [4] в главе «Сервер TCP/IP... много серверов хороших и разных». По сути, это ретранслирующий TCP/IP-сервер, но сейчас это для нас неважно:

Сервер на базе динамического пула потоков

#include <pthread.h>

#include <sys/dispatch.h>

static int ls; // прослушивающий TCP-сокет

THREAD_POOL_PARAM_T* alloc(THREAD_POOL_HANDLE_T* h) {

 return (THREAD_POOL_PARAM_T*)h;

}

// функция блокирования пула потоков

THREAD_POOL_PARAM_T* block(THREAD_POOL_PARAM_T* p) {

 int rs = accept(ls, NULL, NULL);

 if (rs < 0) errx("accept error");

 return(THREAD_POOL_PARAM_T*)rs;

}

int handler(THREAD_POOL_PARAM_T* p) {

 retrans((int)p);

 close((int)p);

 delay(250);

 cout << pthread_self() << flush;

 return 0;

}

int main(int argc, char* argv[]) {

 // создать TCP-сокет на порт

 ls = getsocket(THREAD_POOL_PORT);

 // создание атрибутной записи пула потоков:

 thread_pool_attr_t attr;

 memset(&attr, 0, sizeof(thread_pool_attr_t));

 // заполнение блока атрибутов пула

 /* - mm число блокированных потоков в пуле */

 attr.lo_water = 3;

 /* - max число блокированных потоков в пуле */

 attr.hi_water = 7;

 /* - инкремент шага создания потоков */

 attr.increment = 2;

 attr.maximum = 9;

 /* - общий предел числа потоков в пуле */

 attr.handle = dispatch_create();

 attr.context_alloc = alloc;

 attr.block_func = block;

 attr.handler_func = handler;

 // фактическое создание пула потоков:

 void* tpp = thread_pool_create(&attr, POOL_FLAG_USE_SELF);

 if (tpp == NULL) errx("create pool");

 // начало функционирования пула потоков:

 thread_pool_start(tpp);

 // ... выполнение никогда не дойдет до этой точки!

 exit(EXIT_SUCCESS);

}

Примечание

В примере используются, но не определены две функции, которые не столь существенны для понимания примера сточки зрения функционирования пула:

• errx() — реакция на ошибку выполнения с выводом сообщения и последующим аварийным завершением;

• retrans() — прием сообщения с присоединенного TCP-сокета с последующей ретрансляцией полученного содержимого в него же.

Итак, первая особенность пула потоков в том, что мы построили многопоточный сервер, почти не прописывая собственного кода, — большую часть рутинной работы за нас сделала библиотека пула.

Приведем описание логики работы пула потоков и показанного примера на самом качественном, простейшем уровне:

• Первоначально (при запуске пула потоков в работу вызовом thread_pool_start()) создается attr.lo_water потоков («нижняя ватерлиния» числа блокированных потоков).

• При создании любого потока (как в процессе начального, так и в процессе последующего создания) вызывается функция attr.соntext_alloc() (в контексте созданного потока).

• По завершении функция вызывает блокирующую функцию потока attr.block_func(), на которой созданный поток ожидает события активизации (в показанном примере событие активизации — это установление соединения новым клиентом по возврату из accept()).

• Блокирующая функция после наступления события активизации переведет поток в состояние READY и вызовет в контексте этого потока функцию обработчика attr.handler_func().

• Если после предыдущего шага число оставшихся заблокированных потоков станет ниже attr.lo_water, механизм пула создаст дополнительно attr.increment потоков и «доведет» их до блокирующей функции.

• Активизированный поток производит всю обработку, предписанную функцией потока, и после выполнения потоковой функции будет опять переведен в блокированное состояние в функции блокирования…

• …но перед переводом потока вновь в блокированное состояние проверяется, не будет ли при этом превышено число блокированных потоков attr.hi_water («верхняя ватерлиния»), и если это имеет место, то поток вместо перевода в блокированное состояние самоуничтожается.

• Все проверки числа потоков производятся для того, чтобы общее число потоков пула (т. e. число активизированных потоков вместе с блокированными) не превышало общее ограничение attr.maximum.

Разобрав общую логику функционирования пула потоков, можно теперь детальнее рассмотреть отдельные шаги всего процесса:

1. Прежде чем создавать пул потоков, мы должны создать атрибутную запись, определяющую все поведение пула. Атрибутная запись описана так (<sys/dispatch.h>):

typedef struct _thread_pool_attr {

 THREAD_POOL_HANDLE_T* handle;

 THREAD_POOL_PARAM_T*

 (*block_func)(THREAD_POOL_PARAM_T* ctp);

 void (*unblock_func)(THREAD_POOL_PARAM_T* ctp);

 int (*handler_func)(THREAD_POOL_PARAM_T* ctp);

 THREAD_POOL_PARAM_T*

  (*context_alloc)(THREAD_POOL_HANDLE_T* handle);

 void (*context_free)(THREAD_POOL_PARAM_T* ctp);

 pthread_attr_t* attr;

 unsigned short lo_water;

 unsigned short increment;

 unsigned short hi_water;

 unsigned short maximum;

 unsigned reserved[8];

} thread_pool_attr_t;

Дескриптор создаваемого пула потоков handle, посредством которого мы будем ссылаться на пул, является просто синонимом типа dispatch_t:

#ifndef THREAD_POOL_HANDLE_T

 #define THREAD_POOL_HANDLE_T dispatch_t

#endif

Атрибуты потоков, которые будут работать в составе пула, определяются полем attr типа pthread_attr_t (эту структуру мы детально рассматривали ранее при обсуждении создания единичных потоков).

Численные параметры пула определяют:

lo_water — «нижняя ватерлиния», минимальное число потоков пула, находящихся в блокированном состоянии (в ожидании активизации). Если в результате некоторого события один из ожидающих потоков переходит в состояние активной обработки и число оставшихся блокированных потоков становится меньше lo_water, создается дополнительно increment потоков, которые переводятся в блокированное состояние.

hi_water — максимальное число потоков, которые допустимо иметь в блокированном состоянии. Если после завершения обработки некоторым потоком число заблокированных потоков становится больше hi_water, то этот поток уничтожается.

maximum — общая верхняя граница числа потоков пула (активизированных и заблокированных). Даже если число заблокированных потоков (в пике активности) станет ниже lo_water, но общее число потоков уже достигнет maximum, то новые потоки для пула создаваться не будут.

Функциональные параметры пула определяют:

context_alloc() и context_free() — функции создания и уничтожения контекста потока, которые вызываются при создании и уничтожении каждого потока пула. Функция создания контекста потока ответственна за индивидуальные настройки создаваемого потока. Она возвращает «указатель на контекст» типа THREAD_POOL_PARAM_T. Однако системе такой тип неизвестен:

#ifndef THREAD_POOL_PARAM_T

 #define THREAD_POOL_PARAM_T void

#endif

В качестве контекста может использоваться любой пользовательский тип, и он будет передаваться последовательно в качестве параметра (ctp) во все последующие функции обслуживания потока.

block_func() — функция блокирования, которая вызывается в потоке сразу же после context_alloc() или после очередного этапа выполнения потоком функции обработчика handler_func(). Функция блокирования получает и возвращает далее обработчику (возможно, после модификации) структуру контекста (в приведенном выше примере контекстом является int — значение присоединенного TCP-сокета).

handler_func() — это, собственно, и есть аналог потоковой функции, в которой выполняется вся полезная работа потока. Функция вызывается библиотекой после выхода потока из блокирующей функции block_func(), при этом функция-обработчик handler_func() получит параметр контекста, возвращенный block_func().

Примечание

В текущей реализации handler_func() должна возвращать 0; все другие значения зарезервированы для дальнейших расширений. Аналогично определенная в атрибутной записи функция unblock_func() зарезервирована для дальнейших расширений, и вместо ее адреса следует устанавливать NULL.

2. После создания атрибутной записи пула, определяющей всю функциональность его дальнейшего поведения, можно приступать к непосредственному созданию пула потоков:

thread_pool_t* thread_pool_create(

 thread_pool_attr_t* attr, unsigned flags);

где attr — подробно рассмотренная (и созданная) ранее атрибутная запись пула;

flags — флаг, определяющий поведение вызывающего потока после последующего вызова thread_pool_start(). В документации описано два возможных значения флага:

 • POOL_FLAG_EXIT_SELF — после старта пула поток, вызвавший thread_pool_start() (часто это главный поток приложения), завершается;

 • POOL_FLAG_USE_SELF — после старта пула поток, вызвавший thread_pool_start(), включается в пул в качестве одного из его потоков.

И в том и в другом случае в типовом фрагменте (как и в показанном выше примере):

thread_pool_start(tpp);

exit(EXIT_SUCCESS);

управление никогда не дойдет до выполнения exit(). Но существует еще третье допустимое значение, прямо не указанное в документации, но мельком упоминаемое в других местах документации:

 • 0 — после старта пула поток, вызвавший thread_pool_start(), продолжает свое естественное выполнение.

Например, некоторый фрагмент кода мог бы выглядеть так:

thread_pool_attr_t att; // ...

thread_pool_t *tpp = thread_pool_create(&attr, 0);

thread_pool_start(tpp);

while (true) {

 // выполнять некоторую отличную от пула работу

}

exit(EXIT_SUCCESS);

Как уже понятно из описаний, thread_pool_create() возвращает указатель на управляющую структуру пула потоков, которая позже будет передана thread_pool_start(). Если создание пула завершилось неудачей, то результатом выполнения будет NULL, а в errno будет установлен код ошибки (документацией предусмотрен только один код ошибки: ENOMEM — недостаточно памяти для размещения структур данных).

Примечание

Управляющая структура пула потоков описана так:

typedef struct _thread_pool thread_pool_t;

struct _thread_pool {

 thread_pool_attr_t pool_attr;

 unsigned created;

 unsigned waiting;

 unsigned flags;

 unsigned reserved[3];

};

3. Последний шаг в процедуре запуска пула потоков:

int thread_pool_start(void* pool);

где pool — это указатель, возвращаемый thread_pool_create().[40]

При успешном завершении (которого почти никогда не происходит, за исключением значения флага 0; об этом см. выше) функция возвращает EOK, в противном случае (что происходит гораздо чаще) — значение -1.

4. Другие, относящиеся к библиотеке динамического пула потоков функции, которые целесообразно посмотреть в документации QNX (но которые в силу различных обстоятельств используются гораздо реже):

int thread_pool_destroy(thread_pool_t* pool);

int thread_pool_control(thread_pool_t* pool, thread_pool_attr_t* attr,

 _Uint16t lower, _Uint16t upper, unsigned flags);

int thread_pool_limits(thread_pool_t* pool,

 int lowater, int hiwater, int maximum, int increment, unsigned flags);

Данный текст является ознакомительным фрагментом.