Функция mq_open
Функция mq_open
В листинге 5.17 приведен текст первой части функции mq_open, создающей новую очередь сообщений или открывающей существующую.
Листинг 5.17. Функция mq_open: первая часть
//my_pxmsg._mmap/mq_open. с
1 #include "unpipc.h"
2 #include "mqueue.h"
3 #include <stdarg.h>
4 #define MAX_TRIES 10
5 struct mymq_attr defattr =
6 { 0, 128, 1024, 0 };
7 mymqd_t
8 mymq_open(const char *pathname, int oflag, …)
9 {
10 int i, fd, nonblock, created, save_errno;
11 long msgsize, filesize, index;
12 va_list ap;
13 mode_t mode;
14 int8_t *mptr;
15 struct stat statbuff;
16 struct mymq_hdr *mqhdr;
17 struct mymsg_hdr *msghdr;
18 struct mymq_attr *attr;
19 struct mymq_info *mqinfo;
20 pthread_mutexattr_t mattr;
21 pthread_condattr_t cattr;
22 created = 0;
23 nonblock = oflag & O_NONBLOCK;
24 oflag &= ~O_NONBLOCK;
25 mptr = (int8_t *) MAP_FAILED;
26 mqinfo = NULL;
27 again:
28 if (oflag & O_CREAT) {
29 va_start(ap, oflag); /* ар инициализируется последним аргументом */
30 mode = va_arg(ap, va_mode_t) & ~S_IXUSR;
31 attr = va_arg(ap, struct mymq_attr *);
32 va_end(ap);
33 /* открытие с установкой бита user-execute */
34 fd = open (pathname, oflag | O_EXCL | O_RDWR, mode | S_IXUSR);
35 if (fd < 0) {
36 if (errno == EEXIST && (oflag & O_EXCL) == 0)
37 goto exists; /* уже существует, OK */
38 else
39 return((mymqd_t) –1);
40 }
41 created = 1;
42 /* при создании файла он инициализируется */
43 if (attr == NULL)
44 attr = &defattr;
45 else {
46 if (attr->mq_maxmsg <– 0 || attr->mq_msgsize <= 0) {
47 errno = EINVAL;
48 goto err;
49 }
50 }
Обработка списка аргументов переменного размера
29-32 Функция может быть вызвана либо с двумя, либо с четырьмя аргументами в зависимости от того, указан ли флаг O_CREAT. Если флаг указан, третий аргумент имеет тип mode_t, а это простой системный тип, являющийся одним из целых типов. При этом мы столкнемся с проблемой в BSD/OS, где этот тип данных определен как unsigned short (16 бит). Поскольку целое в этой реализации занимает 32 бита, компилятор С увеличивает аргумент этого типа с 16 до 32 бит, потому что все короткие целые в списке аргументов увеличиваются до обычных целых. Но если мы укажем mode_t при вызове va_arg, он пропустит 16 бит аргумента в стеке, если этот аргумент был увеличен до 32 бит. Следовательно, мы должны определить свой собственный тип данных, va_mode_t, который будет целым в BSD/OS и типом mode_t в других системах. Эту проблему с переносимостью решают приведенные ниже строки нашего заголовка unpipc.h (листинг В.1):
#ifdef __bsdi__
#define va_mode_t int
#else
#define va_mode_t mode_t
#endif
30 Мы сбрасываем бит user-execute (S_IXUSR) в переменной mode по причинам, которые будут вскоре раскрыты.
Создание новой очереди сообщений
33-34 Создается обычный файл с именем, указанным при вызове функции, и устанавливается бит user-execute.
Обработка потенциальной ситуации гонок
35-40 Если бы при указании флага O_CREAT мы просто открыли файл, отобразили его содержимое в память и проинициализировали отображенный файл (как будет описано ниже), у нас возникла бы ситуация гонок. Очередь сообщений инициализируется mq_open только в том случае, если вызывающий процесс указывает флаг O_CREAT и очередь сообщений еще не существует. Это означает, что нам нужно каким-то образом определять, существует она или нет. Для этого при открытии файла для последующего отображения в память мы всегда указываем флаг O_EXCL. Возвращение ошибки EEXIST функцией open является ошибкой для mq_open только в том случае, если при вызове был указан флаг O_EXCL. В противном случае при возвращении функцией open ошибки EEXIST мы делаем вывод, что файл уже существует, и переходим к листингу 5.19, как если бы флаг O_CREAT вовсе не был указан.
Ситуация гонок может возникнуть потому, что использование отображаемого в память файла для реализации очереди сообщений требует двух шагов при инициализации очереди: сначала файл должен быть создан функцией open, а затем его содержимое должно быть проинициализировано. Проблема возникает, если два потока (одного или различных процессов) вызывают mq_open приблизительно одновременно. Один из потоков может создать файл, после чего управление будет передано системой второму потоку, прежде чем первый завершит инициализацию файла. Второй поток обнаружит, что файл уже существует (вызвав open с флагом O_EXCL), и приступит к использованию очереди сообщений.
Мы используем бит user-execute для указания того, был ли проинициализирован файл с очередью сообщений. Этот бит устанавливается только тем потоком, который создает этот файл (флаг O_EXCL позволяет определить этот поток); этот поток инициализирует файл с очередью сообщений, а затем сбрасывает бит user-execute.
Аналогичная ситуация может возникнуть в листингах 10.28 и 10.37.
Проверка атрибутов
42-50 Если при вызове в качестве последнего аргумента передан нулевой указатель, очередь сообщений инициализируется со значениями атрибутов по умолчанию: 128 сообщений в очереди и 1024 байта на сообщение. Если атрибуты указаны явно, мы проверяем, что mq_maxmsg и mq_msgsize имеют положительные значения.
Вторая часть функции mq_open приведена в листинге 5.18. Она завершает инициализацию новой очереди сообщений.
Листинг 5.18. Вторая часть функции mq_open: инициализация новой очереди
//my_pxmsg_mmap/mq_open.с
51 /* вычисление и установка размера файла */
52 msgsize = MSGSIZE(attr->mq_msgsize);
53 filesize = sizeof(struct mymq_hdr) + (attr->mq_maxmsg *
54 (sizeof(struct mymsg_hdr) + msgsize));
55 if (lseek(fd, filesize – 1, SEEK_SET) == –1)
56 goto err;
57 if (write(fd, "", 1) == –1)
58 goto err;
59 /* отображение файла в память */
60 mptr = mmap(NULL, filesize, PROT_READ | PROT_WRITE,
61 MAP_SHARED, fd, 0);
62 if (mptr == MAP_FAILED)
63 goto err;
64 /* выделение структуры mymq_info{} для очереди */
65 if ((mqinfo = mallос (sizeof (struct mymq_info))) == NULL)
66 goto err;
67 mqinfo->mqi_hdr = mqhdr = (struct mymq_hdr *) mptr;
68 mqinfo->mqi_magic = MQI_MAGIC;
69 mqinfo->mqi_flags = nonblock;
70 /* инициализация заголовка в начале файла */
71 /* создание списка пустых сообщений */
72 mqhdr->mqh_attr.mq_flags = 0;
73 mqhdr->mqh_attr.mq_maxmsg = attr->mq_maxmsg;
74 mqhdr->mqh_attr.mq_msgsize = attr->mq_msgsize;
75 mqhdr->mqh_attr.mq_curmsgs = 0;
76 mqhdr->mqh_nwait = 0;
77 mqhdr->mqh_pid = 0;
78 mqhdr->mqh_head = 0;
79 index = sizeof(struct mymq_hdr);
80 mqhdr->mqh_free = index;
81 for (i = 0; i < attr->mq_maxmsg – 1; i++) {
82 msghdr = (struct mymsg_hdr *) &mptr[index];
83 index += sizeof(struct mymsg_hdr) + msgsize;
84 msghdr->msg_next = index;
85 }
86 msghdr = (struct mymsg_hdr *) &mptr[index];
87 msghdr->msg_next = 0; /* конец списка пустых сообщений */
88 /* инициализация взаимного исключения и условной переменной */
89 if ((i = pthread_mutexattr_init(&mattr)) != 0)
90 goto pthreaderr;
91 pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
92 i = pthread_mutex_init(&mqhdr->mqh_lock, &mattr);
93 pthread_mutexattr_destroy(&mattr); /* обязательно нужно удалить */
94 if (i != 0)
95 goto pthreaderr:
96 if ((i = pthread_condattr_init(&cattr)) != 0)
97 goto pthreaderr;
98 pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
99 i = pthread_cond_init(&mqhdr->mqh_wait, &cattr);
100 pthread_condattr_destroy(&cattr); /* обязательно нужно удалить */
101 if (i != 0)
102 goto pthreaderr;
103 /* инициализация завершена, снимаем бит user-execute */
104 if (fchmod(fd, mode) == –1)
105 goto err;
106 close(fd);
107 return((mymqd_t) mqinfo);
108 }
Установка размера файла
51-58 Вычисляется размер сообщения, который затем округляется до кратного размеру длинного целого. Также в файле отводится место для структуры mq_hdr в начале файла и msghdr в начале каждого сообщения (рис. 5.2). Размер вновь созданного файла устанавливается функцией lseek и записью одного байта со значением 0. Проще было бы вызвать ftruncate (раздел 13.3), но у нас нет гарантий, что это сработало бы для увеличения размера файла.
Отображение файла в память
59-63 Файл отображается в память функцией mmap.
Выделение памяти под структуру mq_info
64-66 При каждом вызове mq_open создается отдельный экземпляр mq_infо. Эта структура после создания инициализируется.
Инициализация структуры mq_hdr
67-87 Инициализируется структура mq_hdr. Заголовок связного списка сообщений (mqh_head) инициализируется нулем, а все сообщения в очереди добавляются к списку свободных (mqh_frее).
Инициализация взаимного исключения и условной переменной
88-102 Поскольку очереди сообщений Posix могут использоваться совместно произвольным количеством процессов, которые знают имя очереди и имеют соответствующие разрешения, нам нужно инициализировать взаимное исключение и условную переменную с атрибутом PTHREAD_PROCESS_SHARED. Для этого мы сначала инициализируем атрибуты вызовом pthread_mutexattr_init, а затем устанавливаем значение атрибута совместного использования процессами, вызвав pthread_mutexattr_setpshared. После этого взаимное исключение инициализируется вызовом pthread_mutex_init. Аналогичные действия выполняются для условной переменной. Мы должны аккуратно удалить взаимное исключение и условную переменную даже при возникновении ошибки, поскольку вызовы pthread_ mutexattr_init и pthread_condattr_init выделяют под них память (упражнение 7.3).
Сброс бита user-execute
103-107 После инициализации очереди сообщений мы сбрасываем бит user-execute. Это говорит другим процессам о том, что очередь была проинициализирована. Мы также закрываем файл вызовом close, поскольку он был успешно отображен в память и держать его открытым больше нет необходимости.
В листинге 5.19 приведен конец функции mq_open, в котором осуществляется открытие существующей очереди сообщений.
Листинг 5.19. Третья часть функции mq_open: открытие существующей очереди сообщений
//my_pxmsg_mmap/mq_open.с
109 exists:
110 /* открытие файла и отображение его в память */
111 if ((fd = open(pathname, O_RDWR)) < 0) {
112 if (errno == ENOENT && (oflag & O_CREAT))
113 goto again;
114 goto err;
115 }
116 /* проверяем, что инициализация завершена */
117 for (i = 0; i < MAX TRIES; i++) {
118 if (stat(pathname, &statbuff) == –1) {
119 if (errno == ENOENT && (oflag & O_CREAT)) {
120 close(fd);
121 goto again;
122 }
123 goto err;
124 }
125 if ((statbuff.st_mode & S_IXUSR) == 0)
126 break;
127 sleep(1);
128 }
129 if (i == MAX_TRIES) {
130 errno = ETIMEDOUT;
131 goto err;
132 }
133 filesize = statbuff.st_size;
134 mptr = mmap(NULL, filesize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
135 if (mptr == MAP_FAILED)
136 goto err;
137 close(fd);
138 /* выделяем одну mymq_info{} для каждого вызова open */
139 if ((mqinfo = malloc(sizeof(struct mymq_info))) == NULL)
140 goto err;
141 rnqinfo->mqi_hdr = (struct mymq_hdr *) mptr;
142 mqinfo->mqi_magic = MQI_MAGIC;
143 mqinfo->mqi_flags = nonblock;
144 return((mymqd_t) mqinfo);
145 pthreaderr:
146 errno = i;
147 err:
148 /* не даем следующим вызовам изменить errno */
149 save_errno = errno;
150 if (created)
151 unlink(pathname);
152 if (mptr != MAP_FAILED)
153 munmap(mptr, filesize);
154 if (mqinfo != NULL)
155 free(mqinfo);
156 close(fd);
157 errno = save_errno;
158 return((mymqd_t) –1);
159 }
Открытие существующей очереди сообщений
109-115 Здесь мы завершаем работу, если флаг O_CREAT не был указан или если он был указан, но очередь уже существовала. В любом случае, мы открываем существующую очередь сообщений. Для этого мы открываем для чтения и записи файл, в котором она содержится, функцией open и отображаем его содержимое в адресное пpocтрaнcтвo процесса (mmap).
ПРИМЕЧАНИЕ
Наша реализация сильно упрощена в том, что касается режима открытия файла. Даже если вызвавший процесс указывает флаг O_RDONLY, мы должны дать возможность доступа для чтения и записи при открытии файла командой open и при отображении его в память командой mmap, поскольку невозможно считать сообщение из очереди, не изменив содержимое файла. Аналогично невозможно записать сообщение в очередь, не имея доступа на чтение. Обойти эту проблему можно, сохранив режим открытия (O_RDONLY, O_WRONLY, O_RDWR) в структуре mq_info и проверяя этот режим в каждой из функций. Например, mq_receive должна возвращать ошибку, если в mq_info хранится значение O_WRONLY.
Проверка готовности очереди
116-132 Нам необходимо дождаться, когда очередь будет проинициализирована (в случае, если несколько потоков сделают попытку открыть ее приблизительно одновременно). Для этого мы вызываем stat и проверяем разрешения доступа к файлу (поле st_mode структуры stat). Если бит user-execute сброшен, очередь уже проинициализирована.
Этот участок кода обрабатывает другую возможную ситуацию гонок. Предположим, что два потока разных процессов попытаются открыть очередь приблизительно одновременно. Первый поток создает файл и блокируется при вызове lseek в листинге 5.18. Второй поток обнаруживает, что файл уже существует, и переходит к метке exists, где он вновь открывает файл функцией open и при этом блокируется. Затем продолжается выполнение первого потока, но его вызов mmap в листинге 5.18 не срабатывает (возможно, он превысил лимит использования памяти), поэтому он переходит на метку err и удаляет созданный файл вызовом unlink. Продолжается выполнение второго потока, но если бы мы вызывали fstat вместо stat, он бы вышел по тайм-ауту в цикле for, ожидая инициализации файла. Вместо этого мы вызываем stat, которая возвращает ошибку, если файл не существует, и, если флаг O_CREAT был указан при вызове mq_open, мы переходим на метку again (листинг 5.17) для повторного создания файла. Эта ситуация гонок заставляет нас также проверять, не возвращается ли при вызове open ошибка ENOENT.
Отображение файла в память; создание и инициализация структуры mq_info
133-144 Файл отображается в память, после чего его дескриптор может быть закрыт. Затем мы выделяем место под структуру mq_infо и инициализируем ее. Возвращаемое значение представляет собой указатель на эту структуру.
Обработка ошибок
145-148 При возникновении ошибок происходит переход к метке err, а переменной errno присваивается значение, которое должно быть возвращено функцией mq_open. Мы аккуратно вызываем функции для очистки памяти от выделенных объектов, чтобы переменная errno не изменила свое значение в случае возникновения ошибки в этих функциях.
Данный текст является ознакомительным фрагментом.