Сервер с периодическими импульсами

Сервер с периодическими импульсами

Первое, что следует рассмотреть, — это сервер, который желает получать периодические сообщения. Типовыми применениями такой схемы являются:

• поддерживаемые сервером тайм-ауты клиентских запросов;

• внутренние периодические события серверов.

Конечно, есть и другие, специализированные, применения для таких вещей — например, периодические подтверждения готовности узлов сети («я жив»), запросы на повторную передачу, и т.п.

Поддерживаемые сервером тайм-ауты

В таком сценарии сервер предоставляет клиенту некоторую услугу, и клиент способен задать тайм-аут. Это может использоваться в самых разнообразных приложениях. Например, вы можете сказать серверу «выдай мне данные за 15 секунд» или «дай мне знать, когда истекут 10 секунд», или «жди прихода данных, но в течение не более чем 2 секунд».

Все это — примеры поддерживаемых сервером тайм-аутов. Клиент посылает сообщение серверу и блокируется. Сервер принимает периодические сообщения от таймера (раз в секунду, реже или чаще) и подсчитывает, сколько этих сообщений он получил. Когда число сообщений о тайм-аутах превышает время ожидания, указанное клиентом, сервер отвечает клиенту с сообщением о тайм-ауте или, возможно, с данными, которые он успел накопить на данный момент — это зависит от того, как структурированы отношения клиента и сервера.

Ниже приведен полный пример сервера, который принимает одно из двух сообщений от клиентуры и сообщения о тайм-ауте в виде импульса. Первое клиентское сообщение говорит серверу: «Дай мне знать, есть ли для меня данные, но не блокируй меня более чем на 5 секунд». Второе клиентское сообщение говорит: «Вот, возьми данные». Сервер должен позволить нескольким клиентам блокироваться на себе в ожидании данных, и поэтому обязан сопоставить клиентам тайм-ауты. Тут-то и нужен импульс; он информирует сервер: «Истекла одна секунда».

Чтобы программа не выглядела излишне громоздкой, перед каждым из основных разделов я прерываю исходный текст небольшими пояснениями. Скачать эту программу вы можете на FTP-сайте компании PARSE (ftp://ftp.parseftp.parse.com/pub/book_v3.tar.gz), файл называется time1.с.

Декларации

В первом разделе программы определяются различные именованные константы и структуры данных. В нем также подключаются все необходимые заголовочные файлы. Оставим это без комментариев. :-)

/*

 * time1.c

 *

 * Пример сервера, получающего периодические сообщения

 * от таймера

 * и обычные сообщения от клиента.

 *

 * Иллюстрирует использование функций таймера с импульсами.

*/

#include <stdio.h>

#include <stdlib.h>

#include <time.h>

#include <signal.h>

#include <errno.h>

#include <unistd.h>

#include <sys/siginfo.h>

#include <sys/neutrino.h>

// Получаемые сообщения

// Сообщения

#define MT_WAIT_DATA 2 // Сообщение от клиента

#define MT_SEND_DATA 3 // Сообщение от клиента

// Импульсы

#define CODE_TIMER 1 // Импульс от таймера

// Отправляемые сообщения

#define MT_OK       0 // Сообщение клиенту

#define MT_TIMEDOUT 1 // Сообщение клиенту

// Структура сообщения

typedef struct {

 int messageType; // Содержит сообщение от клиента и

                  // клиенту

 int messageData; // Опциональные данные, зависят от

                  // сообщения

} ClientMessageT;

typedef union {

 ClientMessageT msg;  // Сообщение может быть

                      // либо обычным,

 struct _pulse pulse; // либо импульсом

} MessageT;

// Таблица клиентов

#define MAX_CLIENT 16 // Максимум клиентов

                      // одновременно

struct {

 int in_use;  // Элемент используется?

 int rcvid;   // Идентификатор

              // отправителя клиента

 int timeout; // Оставшийся клиенту

              //тайм-аут

} clients[MAX_CLIENT]; // Таблица клиентов

int chid; // Идентификатор канала

          // (глобальный)

int debug = 1; // Режим отладки, 1 ==

               // вкл, 0 == выкл

char *progname = "time1.c";

// Предопределенные прототипы

static void setupPulseAndTimer(void);

static void gotAPulse(void);

static void gotAMessage(int rcvid, ClientMessageT *msg);

main()

Следующий раздел кода является основным и отвечает за следующее:

• создание канала с помощью функции ChannelCreate();

• вызов подпрограммы setupPulseAndTimer() (для настройки периодического таймера, срабатывающего раз в секунду и использующего импульс в качестве способа доставки события;

• и, наконец, бесконечный цикл ожидания импульсов и сообщений и их обработки.

Обратите внимание на проверку значения, возвращаемого MsgReceive() — нуль указывает, что был принят импульс (здесь мы не делаем никакой дополнительной проверки, наш ли это импульс), ненулевое значение говорит о том, что было принято сообщение.

Обработка импульсов и сообщений выполняется функциями gotAPulse() и gotAMessage().

int main void) // Игнорировать аргументы

               // командной строки

{

 int rcvid; // PID отправителя

 MessageT msg; // Само сообщение

 if ((chid = ChannelCreate(0)) == -1) {

  fprintf(stderr, "%s: не удалось создать канал! ",

   progname);

  perror(NULL);

  exit(EXIT_FAILURE);

 }

 // Настроить импульс и таймер

 setupPulseAndTimer();

 // Прием сообщений

 for(;;) {

  rcvid = MsgReceive(chid, &msg, sizeof(msg), NULL));

  // Определить, от кого сообщение

  if (rcvid == 0) {

   // Здесь неплохо бы еще проверить поле «code»...

   gotAPulse();

  } else {

   gotAMessage(rcvid, &msg.msg);

  }

 }

 // Сюда мы никогда не доберемся

 return (EXIT_SUCCESS);

}

setupPulseAndTimer()

В функции setupPulseAndTimer() вы видите код, в котором определяется тип таймера и схема уведомления. Когда мы рассуждали о таймерных функциях выше, я говорил, что таймер может выдать сигнал или импульс, либо создать поток. Решение об этом принимается именно здесь, в функции setupPulseAndTimer(). Обратите внимание, что здесь мы использовали макроопределение SIGEV_PULSE_INIT(). Используя это макроопределение, мы реально присвоили элементу sigev_notify значение SIGEV_PULSE. (Если бы мы использовали одно из макроопределений семейства SIGEV_SIGNAL*_INIT(), мы получили бы уведомление при помощи соответствующего сигнала). Отметьте, что при настройке импульса мы с помощью вызова ConnectAttach() устанавливаем соединение с самим собой и даем ему уникальный код (здесь — константа CODE_TIMER; мы ее определили сами)

Последний параметр в инициализации структуры события — это приоритет импульса; здесь мы выбрали SIGEV_PULSE_PRIO_INHERIT (константа, равная -1). Это предписывает ядру не изменять приоритет принимающего импульс потока.

В конце описания функции мы вызываем timer_create() для создания таймера в ядре, после чего настраиваем его на срабатывание через одну секунду (поле it_value) и на периодическую перезагрузку односекундными интервалами (поле it_interval). Отметим, что таймер включается только по вызову timer_settime(), а не при его создании.

Схема уведомления по типу SIGEV_PULSE — расширение, свойственное только QNX/Neutrino. Концепция импульсов в POSIX отсутствует.

/*

 * setupPulseAndTimer

 *

 * Эта подпрограмма отвечает за настройку импульса, чтобы

 * тот отправлял сообщение с кодом MT_TIMER.

 * Затем устанавливается

 * периодический таймер с периодом в одну секунду.

*/

void setupPulseAndTimer(void) {

 timer_t timerid; // Идентификатор таймера

 struct sigevent event; // Генерируемое событие

 struct itimerspec timer; // Структура данных

                          // таймера

 int coid; // Будем соединяться с

           // собой

 // Создать канал к себе

 coid = ConnectAttach(0, 0, chid, 0, 0);

 if (coid == -1) {

  fprintf(stderr, "%s: ошибка ConnectAttach! ", progname);

  perror(NULL);

  exit(EXIT_FAILURE);

 }

 // Установить, какое событие мы хотим сгенерировать

 // - импульс

 SIGEV_PULSE_INIT(&event, coid, SIGEV_PULSE_PRIO_INHERIT,

  CODE_TIMER, 0);

 // Создать таймер и привязать к событию

 if (timer_create(CLOCK_REALTIME, &event, &timerid) ==

  -1) {

  fprintf(stderr,

   "%s: не удалось создать таймер, errno %d ",

   progname, errno);

  perror(NULL);

  exit(EXIT_FAILURE);

 }

 // Настроить таймер (задержка 1 с, перезагрузка через

 // 1 с) ...

 timer.it_value.tv_sec = 1;

 timer.it_value.tv_nsec = 0;

 timer.it_interval.tv_sec = 1;

 timer.it_interval.tv_nsec = 0;

 // ...и запустить его!

 timer_settime(timerid, 0, &timer, NULL);

}

gotAPulse()

В функции gotAPulse() вы можете видеть, как мы реализовали способность сервера обеспечивать тайм-ауты для клиентов. Мы последовательно просматриваем список клиентуры и, поскольку мы знаем, что импульс выдается один раз в секунду, просто уменьшаем число секунд, которое остается клиенту до тайм-аута. Если эта величина достигает нулевого значения, мы отвечаем этому клиенту сообщением «Извините, тайм-аут» (тип сообщения MT_TIMEDOUT). Обратите внимание, что мы подготавливаем это сообщение заранее (вне цикла for), а затем посылаем его по мере необходимости. Этот прием — по существу вопрос стиля: если вы предполагаете отвечать часто, возможно, имело бы смысл выполнить настройку однажды и загодя. Если же множество ответов не ожидается, то имело бы больший смысл делать настройки по мере необходимости.

Если значение оставшегося времени еще не достигло нуля, мы не делаем ничего — клиент по-прежнему заблокирован в ожидании сообщения.

/*

 * gotAPulse

 *

 * Эта подпрограмма отвечает за обработку тайм-аутов.

 * Она проверяет список клиентов на предмет тайм-аута

 * и отвечает соответствующим сообщением тем клиентам,

 * у которых тайм-аут произошел.

*/

void gotAPulse(void) {

 ClientMessageT msg;

 int i;

 if (debug) {

  time_t now;

  time(&now);

  printf("Получен импульс, время %s", ctime(&now));

 }

 // Подготовить ответное сообщение

 msg.messageType = MT_TIMEDOUT;

 // Просмотреть список клиентов

 for (i = 0; i < MAX_CLIENT; i++) {

  // Элемент используется?

  if (clients[i].in_use) {

   // Тайм-аут?

   if (--clients[i].timeout == 0) {

    // Ответить

    MsgReply(clients[i].rcvid, EOK, &msg, sizeof(msg));

    // Освободить элемент

    clients[i].in_use = 0;

   }

  }

 }

}

gotAMessage()

В функции gotAMessage() вы видите другую половину заданной функциональности, где мы добавляем клиента в список клиентуры, ожидающей данные (если получено сообщение типа MT_WAIT_DATA), или сопоставляем клиента с сообщением, которое было только что получено (если это сообщение типа MT_SEND_DATA). Заметьте, что для простоты мы здесь не реализуем очередь клиентов, находящихся в ожидании передачи данных, получатель для которых еще не доступен — это вопрос управления очередями, оставьте его для себя в качестве упражнения.

/*

 * gotAMessage

 *

 * Эта подпрограмма вызывается при каждом приеме

 * сообщения. Проверяем тип

 * сообщения (либо «жду данных», либо «вот данные»),

 * и действуем

 * соответственно. Для простоты предположим, что данные

 * никогда не ждут.

 * Более подробно об этом см. в тексте.

*/

void gotAMessage(int rcvid, ClientMessageT *msg) {

 int i;

 // Определить тип сообщения

 switch (msg->messageType) {

 // Клиент хочет ждать данных

 case MT_WAIT_DATA:

  // Посмотрим, есть ли пустое место в таблице клиентов

  for (i = 0; i < MAX_CLIENT; i++) {

   if (!clients[i].in_use) {

    // Нашли место - пометить как занятое,

    // сохранить rcvid

    // и установить тайм-аут

    clients[i].in_use = 1;

    clients[i].rcvid = rcvid;

    clients[i].timeout = 5;

    return;

   }

  }

  fprintf(stderr,

   "Таблица переполнена, сообщение от rcvid %d"

   " игнорировано, клиент заблокирован ", rcvid);

  break;

  // Клиент с данными

 case MT_SEND_DATA:

  // Посмотрим, есть ли другой клиент, которому можно ответить

  // данными от этого клиента

  for (i = 0; i < MAX CLIENT; i++) {

   if (clients[i].in_use) {

    // Нашли - использовать полученное сообщение

    // в качестве ответного

    msg->messageType = MT_OK;

    // Ответить ОБОИМ КЛИЕНТАМ!

    MsgReply(clients[i].rcvid, EOK, msg, sizeof(*msg));

    MsgReply(rcvid, EOK, msg, sizeof(*msg));

    clients[i].in_use = 0;

    return;

   }

  }

  fprintf(stderr,

   "Таблица пуста, сообщение от rcvid %d игнорировано,"

   " клиент заблокирован ", rcvid);

  break;

 }

}

Примечания

Несколько общих замечаний по тексту программы:

• Если сообщение с данными прибывает, когда либо никто не ждет, либо список ожидающих клиентов переполнен, в стандартный поток ошибок выводится сообщение, но клиенту при этом мы не отвечаем ничего. Это означает, что ряд клиентов может оказаться в REPLY-блокированном состоянии навсегда — идентификаторы отправителей мы потеряли, а значит, и ответ им дать не можем.

Это сделано намеренно. Вы можете изменить это, добавив соответственно сообщения MT_NO_WAITERS и MT_NO_SPACE, которыми можно было бы отвечать всякий раз при обнаружении ошибок данного типа.

• Когда клиент-обработчик ждет, а клиент-поставщик пересылает ему данные, мы отвечаем обоим клиентам. Это критично, поскольку мы должны разблокировать обоих клиентов.

• Мы повторно использовали буфер клиента-поставщика для обоих ответов. Этот прием программирования — опять же, вопрос стиля: в большом приложении у вас, вероятно, было бы много типов возвращаемых значений, и вы могли бы и не захотеть повторно использовать одни и те же буферы.

• В приведенном примере используется «щербатый» массив фиксированной длины с флагом «элемент задействован» (clients[i].in_use). Поскольку моей целью здесь является отнюдь не демонстрация хитростей программирования односвязных списков, я использовал простейший для понимания вариант. В конечном же программном продукте, разумеется, имело бы смысл использовать динамический список.

• Когда функция MsgReceive() получает импульс, наше решение относительно того, действительно ли это «наш» импульс, фактически является весьма слабо аргументированным — мы просто предполагаем (согласно комментариям), что все входящие импульсы имеют тип CODE_TIMER. Опять же, в конечном продукте следовало бы проверять значение кода импульса и сообщать о наличии каких-либо аномалий.

Отметим, что в приведенном примере демонстрируется только один способ реализации тайм-аутов клиентуры. Позже, в этой же главе (в разделе «Тайм-ауты ядра») мы поговорим о тайм-аутах ядра. Это еще один способ делать почти то же самое, только управление на этот раз осуществляется клиентом, а не таймером.

Внутренние периодические события серверов

Здесь мы имеем несколько другое применение для периодических сообщений о тайм-аутах, когда эти сообщения предназначены сервером исключительно для внутреннего использования и не имеют никакого отношения к клиенту вообще.

Например, некоторые аппаратные средства могут требовать, чтобы сервер опрашивал их периодически — например, такое может быть в случае сетевого соединения: сервер должен периодически проверять, является ли данное подключение доступным, и это не зависит от команд клиентуры.

Другой вариант — если, например, в аппаратных средствах предусмотрен таймер «выключения по неактивности». Например, если длительное пребывание какого-то аппаратного модуля во включенном состоянии может приводить к неоправданным затратам электроэнергии, то если его никто не использует в течение, скажем, 10 секунд, его можно было бы выключить (или переключить в режим низкого энергопотребления — прим. ред.). Опять же, к клиенту это не имеет никакого отношения (за исключением того, что запрос от клиента отменит режим ожидания) — это просто функция, которую сервер должен уметь предоставлять «своим» аппаратным средствам.

Код в этом случае сильно бы напоминал приведенный выше пример, за исключением того, что вместо списка ожидающих клиентов у вас была бы только одна переменная тайм-аута. С каждым событием от таймера ее значение уменьшалось бы, но пока оно больше нуля, ничего бы не происходило. Когда оно стало бы равным нулю, это вызывало бы отключение аппаратных средств (или какое-либо другое соответствующее действие).

Единственный трюк здесь заключается в том, что всякий раз, когда поступает сообщение от клиента, использующего данные аппаратные средства, вы должны восстановить первоначальное значение этой переменной, поскольку обращение к ресурсу должно сбрасывать «обратный отсчет». И наоборот, аппаратным средствам может потребоваться определенный промежуток времени «на разогрев» после включения. В этом случае после выключения аппаратных средств вам придется при поступлении запроса от клиента организовать еще один таймер, чтобы «придержать» запрос до того момента, пока аппаратные средства не станут готовы.