Функция mq_send
Функция mq_send
В листинге 5.25 приведен текст первой половины нашей функции mqsend.
Инициализация
14-29 Мы получаем указатели на используемые структуры и блокируем взаимное исключение для данной очереди. Проверяем, не превышает ли размер сообщения максимально допустимый для данной очереди.
Проверка очереди на пустоту и отправка уведомления
30-38 Если мы помещаем первое сообщение в пустую очередь, нужно проверить, не зарегистрирован ли какой-нибудь процесс на уведомление об этом событии и нет ли потоков, заблокированных в вызове mq_receive. Для проверки второго условия мы воспользуемся сохраняемым функцией mq_receive счетчиком mqh_nwait, содержащим количество потоков, заблокированных в вызове mq_receive. Если этот счетчик имеет ненулевое значение, мы не отправляем уведомление зарегистрированному процессу. Для отправки сигнала SIGEV_SIGNAL используется функция sigqueue. Затем процесс снимается с регистрации.
ПРИМЕЧАНИЕ
Вызов sigqueue для отправки сигнала приводит к передаче сигнала SI_QUEUE обработчику сигнала в структуре типа siginfo_t (раздел 5.7), что неправильно. Отправка правильного значения si_code (а именно SI_MESGQ) из пользовательского процесса осуществляется в зависимости от реализации. На с. 433 стандарта IEEE 1996 [8] отмечается, что для отправки этого сигнала из пользовательской библиотеки необходимо воспользоваться скрытым интерфейсом механизма отправки сигналов.
Проверка заполненности очереди
39-48 Если очередь переполнена и установлен флаг O_NONBLOCK, мы возвращаем ошибку с кодом EAGAIN. В противном случае мы ожидаем сигнала по условной переменной mqh_wait, который, как мы увидим, отправляется функцией mq_receive при считывании сообщения из переполненной очереди.
ПРИМЕЧАНИЕ
Наша реализация упрощает ситуацию с возвращением ошибки EINTR при прерывании вызова mq_send сигналом, перехватываемым вызвавшим процессом. Проблема в том, что функция pthread_cond_wait не возвращает ошибки при возврате из обработчика сигнала: она может вернуть либо 0 (что рассматривается как ложное пробуждение), либо вообще не завершить работу. Все эти проблемы можно обойти, но это непросто.
В листинге 5.26 приведена вторая половина функции mq_send. К моменту ее выполнения мы уже знаем о наличии в очереди свободного места для нашего сообщения.
Листинг 5.25. Функция mq_send: первая половина
//my_pxmsg_mmap/mq_send.с
1 #include "unpipc.h"
2 #include "mqueue.h"
3 int
4 mymq_send(mymqd_t mqd, const char *ptr, size_t len, unsigned int prio)
5 {
6 int n;
7 long index, freeindex;
8 int8_t *mptr;
9 struct sigevent *sigev;
10 struct mymq_hdr *mqhdr;
11 struct mymq_attr *attr;
12 struct mymsg_hdr *msghdr, *nmsghdr, *pmsghdr;
13 struct mymq_info *mqinfo;
14 mqinfo = mqd;
15 if (mqinfo->mqi_magic != MQI_MAGIC) {
16 errno = EBADF;
17 return(-1);
18 }
19 mqhdr = mqinfo->mqi_hdr; /* указатель типа struct */
20 mptr = (int8_t *) mqhdr; /* указатель на байт */
21 attr = &mqhdr->mqh_attr;
22 if ((n = pthread_mutex_lock(&mqhdr->mqh_lock)) != 0) {
23 errno = n;
24 return(-1);
25 }
26 if (len > attr->mq_msgsize) {
27 errno = EMSGSIZE;
28 goto err;
29 }
30 if (attr->mq_curmsgs == 0) {
31 if (mqhdr->mqh_pid != 0 && mqhdr->mqh_nwait == 0) {
32 sigev = &mqhdr->mqh_event;
33 if (sigev->sigev_notify == SIGEV_SIGNAL) {
34 sigqueue(mqhdr->mqh_pid, sigev->sigev_signo,
35 sigev->sigev_value);
36 }
37 mqhdr->mqh_pid = 0; /* снятие с регистрации */
38 }
39 } else if (attr->mq_curmsgs >= attr->mq_maxmsg) {
40 /* 4queue is full */
41 if (mqinfo->mqi_flags & O_NONBLOCK) {
32 errno = EAGAIN;
43 goto err;
44 }
45 /* ожидание освобождения места в очереди */
46 while (attr->mq_curmsgs >= attr->mq_maxmsg)
47 pthread_cond_wait(&mqhdr->mqh_wait, &mqhdr->mqh_lock);
48 }
Листинг 5.25. Функция mq_send: вторая половина
//my_pxmsg_mmap/mq_send.с
49 /* nmsghdr будет указывать на новое сообщение*/
50 if ((freeindex = mqhdr->mqh_free) == 0)
51 err_dump("mymq_send: curmsgs = %ld; free = 0", attr->mq_curmsgs);
52 nmsghdr = (struct mymsg_hdr *) &mptr[freeindex];
53 nmsghdr->msg_prio = prio;
54 nmsghdr->msg_len = len;
55 memcpy(nmsghdr + 1, ptr, len); /* копирование сообщения в очередь */
56 mqhdr->mqh_free = nmsghdr->msg_next; /* новое начало списка пустых сообщений */
57 /* поиск места в списке для нового сообщения */
58 index = mqhdr->mqh_head;
59 pmsghdr = (struct mymsg_hdr *) &(mqhdr->mqh_head);
60 while (index != 0) {
61 msghdr = (struct mymsg_hdr *) &mptr[index];
62 if (prio > msghdr->msg_prio) {
63 nmsghdr->msg_next = index;
64 pmsghdr->msg_next = freeindex;
65 break;
66 }
67 index = msghdr->msg_next;
68 pmsghdr = msghdr;
69 }
70 if (index == 0) {
71 /* очередь была пуста или новое письмо добавлено к концу списка */
72 pmsghdr->msg_next = freeindex;
73 nmsghdr->msg_next = 0;
74 }
75 /* запускаем любой из процессов, заблокированных в mq_receive */
76 if (attr->mq_curmsgs == 0)
77 pthread_cond_signal(&mqhdr->mqh_wait);
78 attr->mq_curmsgs++;
79 pthread_mutex_unlock(&mqhdr->mqh_lock);
80 return(0);
81 err:
82 pthread_mutex_unlock(&mqhdr->mqh lock);
83 return(-1);
84 }
Получение индекса свободного блока
50-52 Поскольку количество свободных сообщений при создании очереди равно mq_maxmsg, ситуация, в которой mq_curmsgs будет меньше mq_maxmsg для пустого списка свободных сообщений, возникнуть не может.
Копирование сообщения
53-56 Указатель nmsghdr хранит адрес области памяти, в которую помещается сообщение. Приоритет и длина сообщения сохраняются в структуре msg_hdr, а затем в память копируется содержимое сообщения, переданного вызвавшим процессом.
Помещение нового сообщения в соответствующее место связного списка
57-74 Порядок сообщений в нашем списке зависит от их приоритета: они расположены в порядке его убывания. При добавлении нового сообщения мы проверяем, существуют ли сообщения с тем же приоритетом; в этом случае сообщение добавляется после последнего из них. Используя такой метод упорядочения, мы гарантируем, что mq_receive всегда будет возвращать старейшее сообщение с наивысшим приоритетом. По мере продвижения по списку мы сохраняем в pmsghdr адрес предыдущего сообщения, поскольку именно это сообщение будет хранить индекс нового сообщения в поле msg_next.
ПРИМЕЧАНИЕ
Наша схема может оказаться медленной в случае наличия в очереди большого количества сообщений, поскольку каждый раз при добавлении нового придется просматривать их значительную часть. Можно хранить отдельно индексы последних сообщений со всеми имеющимися значениями приоритета.
Пробуждение любого процесса, заблокированного в вызове mq_receive
75-77 Если очередь была пуста в момент помещения в нее нового сообщения, мы вызываем pthread_cond_signal, чтобы разблокировать любой из процессов, ожидающих сообщения.
78 Увеличиваем на единицу количество сообщений в очереди mq_curmsgs.
Данный текст является ознакомительным фрагментом.