4.4.5. Обычные потоковые семафоры

We use cookies. Read the Privacy and Cookie Policy

4.4.5. Обычные потоковые семафоры

В предыдущем примере, в котором группа потоков обрабатывает задания из очереди, потоковая функция запрашивает задания до тех пор, пока очередь не опустеет, после чего поток завершается. Эта схема работает в том случае, когда все задания помещаются в очередь заранее или новые задания поступают по крайней мере так же часто, как их запрашивают потоки. Но если потоки начнут работать слишком быстро, очередь опустеет и потоки завершатся. Может оказаться, что задание поступило, а потоков, готовых его обработать, уже нет. Следовательно, необходим механизм, который позволит блокировать потоки в случае, когда очередь пуста, а новые задания еще не поступили.

Такой механизм называется семафором. Семафор — это счетчик, используемый для синхронизации потоков. Операционная система гарантирует, что проверка и модификация значения семафора могут быть выполнены безопасно и не приведут к возникновению гонки.

Счетчик семафора является неотрицательным целым числом. Семафор поддерживает две базовые операции.

? Операция ожидания (wait) уменьшает значение счетчика на единицу. Если счетчик уже равен нулю, операция блокируется до тех пор, пока значение счетчика не станет положительным (вследствие действий, выполняемых другими потоками). После снятия блокировки значение семафора уменьшается на единицу и операция завершается.

? Операция установки (post) увеличивает значение счетчика на единицу. Если до этого счетчик был равен нулю и существовали потоки, заблокированные в операции ожидания данного семафора, один из них разблокируется и завершает свою операцию (т.е. счетчик семафора снова становится равным нулю).

В Linux имеются две немного отличающиеся реализации семафоров. Та, которую мы опишем ниже, соответствует стандарту POSIX. Такие семафоры применяются для организации взаимодействия потоков. Другая реализация, служащая целям межпроцессного взаимодействия, рассмотрена в разделе 5.2, "Семафоры для процессов". При работе с семафорами необходимо включить в программу файл <semaphore.h>.

Семафор представляется переменной типа sem_t. Семафор следует предварительно инициализировать с помощью функции sem_init(), передав ей указатель на переменную семафора. Второй параметр этой функции должен быть равен нулю,[14] а третий — это начальное значение счетчика семафора.

Чтобы выполнить операцию ожидания семафора, необходимо вызвать функцию sem_wait(). Функция sem_post() устанавливает семафор. Есть также функция sem_trywait(), реализующая операцию неблокирующего ожидания. Она напоминает функцию pthread_mutex_trylock(): если операция ожидания приведет к блокированию потока из-за того, что счетчик семафора равен нулю, функция немедленно завершается, возвращая код ошибки EAGAIN.

В Linux имеется функция sem_getvalue(), позволяющая узнать текущее значение счетчика семафора. Это значение помещается в переменную типа int, на которую ссылается второй аргумент функции. Не пытайтесь на основании данного значения определять, стоит ли выполнять операцию ожидания или установки, так как это может привести к возникновению гонки: другой поток способен изменить счетчик семафора между вызовами функции sem_getvalue() и какой-нибудь другой функции работы с семафором. Доверяйте только атомарным функциям sem_wait() и sem_post().

Но вернемся к нашему примеру. Можно сделать так, чтобы с помощью семафора потоковая функция проверяла, сколько заданий находится в очереди. Измененная версия программы приведена в листинге 4.12.

Листинг 4.12. (job-queue3.c) Работа с очередью заданий с применением семафора

#include <malloc.h>

#include <pthread.h>

#include <semaphore.h>

struct job {

 /* Ссылка на следующий элемент связанного списка. */

 struct job* next;

 /* Другие поля, описывающие требуемую операцию... */

};

/* Список отложенных заданий. */

struct job* job_queue;

/* Исключающий семафор, защищающий очередь. */

pthread_mutex_t job_queue_mutex =

 PTHREAD_MUTEX_INITIALIZER;

/* Семафор, подсчитывающий число гаданий в очереди. */

sem_t job_queue_count;

/* Начальная инициализация очереди. */

void initialize_job_queue() {

 /* Вначале очередь пуста. */

 job_queue = NULL;

 /* Устанавливаем начальное значение счетчика семафора

    равным 0. */

 sem_init(&job_queue_count, 0, 0);

}

/* Обработка заданий до тех пор, пока очередь не опустеет. */

void* thread_function(void* arg) {

 while (1) {

  struct job* next_job;

  /* Дожидаемся готовности семафора. Если его значение больше

     нуля, значит, очередь не пуста; уменьшаем счетчик на 1.

     В противном случае операция блокируется до тех пор, пока

     в очереди не появится новое задание. */

  sem_wait(&job_queue_count);

  /* Захват исключающего семафора, защищающего очередь. */

  pthread_mutex_lock(&job_queue_mutex);

  /* Мы уже знаем, что очередь не пуста, поэтому без лишней

     проверки запрашиваем новое задание. */

  next_job = job_queue;

  /* Удаляем задание из списка. */

  job_queue = job_queue->next;

  /* освобождаем исключающий семафор, так как работа с

     очередью окончена. */

  pthread_mutex_unlock(&job_queue_mutex);

  /* Выполняем задание. */

  process_job(next_job);

  /* Очистка. */

  free(next_job);

 }

 return NULL;

}

/* Добавление нового задания в начало очереди. */

void enqueue_job(/* Передача необходимых данных... */) {

 struct job* new_job;

 /* Выделение памяти для нового объекта задания. */

 new_job = (struct job*)malloc(sizeof(struct job));

 /* Заполнение остальных полей структуры JOB... */

 /* Захватываем исключающий семафор, прежде чем обратиться

    к очереди. */

 pthread_mutex_lock(&job_queue_mutex);

 /* Помещаем новое задание в начало очереди. */

 new_job->next = job_queue;

 job_queue = new_job;

 /* Устанавливаем семафор, сообщая о том, что в очереди появилось

    новое задание. Если есть потоки, заблокированные в ожидании

    семафора, один из них будет разблокирован и

    обработает задание. */

 sem_post(&job_queue_count);

 /* Освобождаем исключающий семафор. */

 pthread_mutex_unlock(&job_queue_mutex);

}

Прежде чем извлекать задание из очереди, каждый поток дожидается семафора. Если счетчик семафора равен нулю, т.е. очередь пуста, поток блокируется до тех пор, пока в очереди не появится новое задание и счетчик не станет положительным.

Функция enqueue_job() добавляет новое задание в очередь. Подобно потоковой функции, она захватывает исключающий семафор, перед тем как обратиться к очереди. После добавления задания функция enqueue_job() устанавливает семафор, сообщая потокам о том, что задание доступно. В программе, показанной в листинге 4.12, потоки никогда не завершаются: если задания не поступают в течение длительного времени, все потоки переводятся в режим блокирования функцией sem_wait().