Модель с одним производителем и одним потребителем

Модель с одним производителем и одним потребителем

Вначале рассмотрим модель с одним производителем и одним потребителем. Затем мы ее расширим до модели с одним производителем и несколькими потребителями. Нам необходимо, чтобы сразу после генерирования производителем "достаточного" объема данных потребитель мог начинать использовать уже сгенерированные данные. Поэтому необходимо рассмотреть три ситуации: производитель и потребитель работают согласованно;

потребитель прекращает свою работу или блокируется, поскольку производитель не создал достаточный объем данных;

производитель блокируется, поскольку потребитель не успел выполнить считывание уже созданных данных.

В примере с копированием потока производитель будет прекращать работу, если ему удастся заполнить все буферы прежде, чем потребитель успеет считать и обработать первый буфер. Потребитель будет блокироваться, если ему удастся обработать все буферы прежде, чем производитель успеет заполнить еще один буфер.

Следовательно, разрабатываемый нами класс синхронизации должен содержать четыре метода: вызываемый производителем, чтобы начать генерирование данных;

вызываемый при наличии каких-либо данных, готовых для использования потребителем;

вызываемый потребителем, чтобы начать потребление данных;

и, наконец, вызываемый потребителем по завершении потребления им объема данных, достаточного для возобновления генерации данных производителем. Как и в случае потоков считывания-записи, оба метода запуска могут блокировать вызывающие их потоки.

Полный код интерфейса и реализации класса производителя-потребителя приведен в листинге 12.7. Как видите, реализация весьма проста.

Листинг 12.7. Класс синхронизации одного производителя и одного потребителя type

TtdProduceConsumeSync = class private

FHasData : THandle;

{семафор}

FNeedsData : THandle;

{семафор}

protected

public

constructor Create(aBufferCount : integer);

destructor Destroy; override;

procedure StartConsuming;

procedure StartProducing;

procedure StopConsuming;

procedure StopProducing;

end;

Первым делом, мы рассмотрим метод StartProducing (см. листинг 12.8), вызываемый производителем для запуска генерирования данных. Метод будет вызывать блокировку, если потребитель не успел использовать достаточно данных, чтобы производитель мог заменить их новыми. Метод достаточно прост: он просто ожидает передачи семафора "требуются данные". Как мы увидим, этот семафор будет передаваться потребителем.

Листинг 12.8. Метод StartProducing

procedure TtdProduceConsumeSync.StartProducing;

begin

{чтобы генерирование было начато, должен быть передан семафор "требуются данные"}

WaitForSingleObject(FNeedsData, INFINITE);

end;

Производитель будет вызывать второй метод, StopProducing (см. листинг 12.9), сообщающий потребителю о том, что он сгенерировал определенные (возможно все) данные, и что, следовательно, существуют данные, которые нужно использовать. Его реализация также проста: код просто передает семафор "имеются данные", ожидаемый потребителем.

Листинг 12.9. Метод StopProducing

procedure TtdProduceConsumeSync.StopProducing;

begin

{при генерировании каких-либо дополнительных данных потребителю нужно сообщить о необходимости их использования}

ReleaseSemaphore(FHasData, 1, nil);

end;

Третий метод, StartConsuming (листинг 12.10), вызывается потребителем перед тем, как он приступит к потреблению сгенерированных производителем данных. Метод будет вызывать блокировку на время ожидания семафора "имеются данные", который будет передаваться немедленно, если производитель уже сгенерировал какие-либо данные.

Листинг 12.10. Метод StartConcuming

procedure TtdProduceConsumeSync.StartConsuming;

begin

{чтобы можно было начать потребление данных, должен быть передан семафор "имеются данные"}

WaitForSingleObject(FHasData, INFINITE);

end;

Последний метод, StopConcuming (листинг 12.11), вызывается потребителем при считывании им достаточного объема (или всех) данных, чтобы производитель мог сгенерировать дополнительные данные. Очевидно, что этот метод всего лишь передает семафор "требуются данные", который будет предоставлять свободу действий производителю, если тот находится в состоянии ожидания.

Листинг 12.11. Метод StopConcuming

procedure TtdProduceConsumeSync.StopConsuming;

begin

{если какие-либо данные были использованы, нужно сигнализировать производителю о необходимости генерации дополнительных данных}

ReleaseSemaphore(FNeedsData, 1, nil);

end;

Полный исходный код класса TtdProduceConsumeSync можно найти на Web-сайте издательства, в разделе материалов. После выгрузки материалов отыщите среди них файл TDPCSync.pas.

Обратите внимание, что при использовании объекта семафора Windows неявно предполагается, что данные могут храниться только в 127 или меньшем количестве буферов, поскольку каждый раз, когда производитель сообщает, что потребитель может использовать какие-либо дополнительные данные, значение семафора "имеются данные" увеличивается на единицу (а его максимальное значение ограничено величиной, равной 127). Аналогичные соображения справедливы и по отношению к семафору "требуются данные". Однако в целом, это не столь уж большое ограничение. Во множестве сценариев с применением производителя-потребителя для передачи данных используется всего один буфер, а подпрограмма копирования потока, которую мы будем рассматривать, использует очередь буферов, содержащую 20 элементов.

Очередь буферов, используемая в рассматриваемом примере копирования потока, реализована в виде циклической очереди. Очередь создается с заранее выделенными всеми ее буферами. Код реализации этого класса приведен в листинге 12.12.

Обратите внимание, что мы не будем использовать диспетчер кучи во время процесса копирования потока, поскольку критический раздел защищает диспетчер кучи в многопоточной подпрограмме. Если начать вызывать подпрограммы распределения и освобождения памяти из потоков, они слишком легко смогут блокировать одна другую и, возможно, препятствовать достижению основной цели применения класса синхронизации производителя-потребителя.

Производитель будет заполнять буфер в начале очереди, а затем перемещать указатель начала очереди. С другой стороны, потребитель будет считывать, данные из буфера в конце очереди, а затем перемещать конец очереди. Процессы заполнения и считывания могут происходить одновременно, поскольку они используют различные буферы.

Листинг 12.12. Класс TQueuedBuffers, предназначенный для выполнения копирования потока

type

PBuffer= ^TBuffer;

TBuffer = packed record

bCount : longint;

bBlock : array [0..pred(BufferSize)] of byte;

end;

PBufferArray = ^TBufferArray;

TBufferArray = array [0..1023] of PBuffer;

type

TQueuedBuffers = class private

FBufCount : integer;

FBuffers : PBufferArray;

FHead : integer;

FTail : integer;

protected

function qbGetHead : PBuffer;

function qbGetTail : PBuffer;

public

constructor Create(aBufferCount : integer);

destructor Destroy; override;

procedure AdvanceHead;

procedure AdvanceTail;

property Head : PBuffer read qbGetHead;

property Tail : PBuffer read qbGetTail;

end;

constructor TQueuedBuffer s.Create(aBufferCount : integer);

var

i : integer;

begin

inherited Create;

{распределить буферы}

FBuffers := AllocMem(aBufferCount * sizeof(pointer));

for i := 0 to pred(aBufferCount) do

GetMem(FBuffers^[i], sizeof(TBuffer));

FBufCount := aBufferCount;

end;

destructor TQueuedBuffers.Destroy;

var

i : integer;

begin

{освободить буферы}

if (FBuffers <> nil) then begin

for i := 0 to pred( FBuf Count) do

if (FBuffers^[i] <> nil) then

FreeMem(FBuffers^[i], sizeof(TBuffer));

FreeMem(FBuffers, FBufCount * sizeof(pointer));

end;

inherited Destroy;

end;

procedure TQueuedBuffers.AdvanceHead;

begin

inc(FHead);

if (FHead = FBufCount) then

FHead := 0;

end;

procedure TQueuedBuffers.AdvanceTail;

begin

inc(FTail);

if (FTail = FBuf Count) then

FTail := 0;

end;

function TQueuedBuffers.qbGetHead : PBuffer;

begin

Result := FBuffers^[FHead];

end;

function TQueuedBuffers.qbGetTail : PBuffer;

begin

Result := FBuffers^[FTail];

end;

Менее очевидно то, что указатели начала и конца очереди не должны быть защищены от изменений критическими разделами или какими-то аналогичными элементами. На первый взгляд это кажется противоречащим здравому смыслу и всем правилам совместного использования данных в различных потоках. Однако поток потребителя никогда не будет обращаться к указателю конца очереди. О наличии данных, которые нужно считать из указателя начала очереди, ему будет сообщать поток производителя (в этот момент времени указатели начала и конца очереди будут различными). Аналогично, поток производителя никогда не будет обращаться к указателю начала очереди, поскольку о наличии места для добавления данных в конце очереди ему будет сообщать поток потребителя.

Коды реализации классов производителя и потребителя приведены в листинге 12.13. Эти классы являются производными от класса TThread. Код реализации каждого из перекрытых методов Execute не отличается от ранее описанного. Поток производителя входит в цикл. На каждом шаге цикла он вызывает метод StartProducer объекта синхронизации, а затем считывает блок данных из исходного потока в буфер в конце очереди. После этого он смещает указатель конца очереди. И, в заключение, он вызывает метод StopProducing и повторяет цикл с начала. Выполнение цикла прекращается, как только поток производителя устанавливает буфер в состояние, соответствующее отсутствию в нем каких-либо данных (потребитель воспринимает это состояние в качестве признака "конец потока").

В свою очередь, цикл потока потребителя выполняется следующим образом. Вначале поток вызывает метод StartConsuming объекта синхронизации. Возврат из этого метода свидетельствует об отсутствии данных для считывания в объекте поставленных в очередь буферов. Поток считывает данные из буфера, определяемого указателем начала очереди, и записывает их в поток назначения. Затем он смещает указатель начала очереди. Сразу после считывания всех данных из заполненного буфера он вызывает метод StopConsuming объекта синхронизации и повторяет цикл сначала. Работа потребителя останавливается при получении им пустого буфера.

Листинг 12.13. Классы производителя и потребителя

type

TProducer = class (TThread) private

FBuffers : TQueuedBuffers;

FStream : TStream;

FSyncObj : TtdProduceConsumeSync;

protected

procedure Execute; override;

public

constructor Create(aStream : TStream;

aSyncObj : TtdProduceConsumeSync;

aBuffers : TQueuedBuffers);

end;

constructor TProducer.Create(aStream : TStream;

aSyncObj : TtdProduceConsumeSync;

aBuffers : TQueuedBuffers);

begin

inherited Create (true);

FStream := aStream;

FSyncObj :=,aSyncObj;

FBuffers aBuffers;

end;

procedure TProducer.Execute;

var

Tail : PBuffer;

begin

{выполнять до момента опустошения потока...}

repeat

{сигнализировать о готовности к началу генерирования данных}

FSyncObj.StartProducing;

{считать блок из потока в конечный буфер}

Tail FBuffers.Tail;

Tail^.bCount := FStream.Read(Tail^.bBlock, BufferSize);

{переместить указатель конца очереди}

FBuffers.AdvanceTail;

{поскольку выполняется запись нового буфера, необходимо сигнализировать о созданных данных}

FSyncObj.StopProducing;

until (Tail^.bCount ? 0);

end;

type

TConsumer = class(TThread) private

FBuffers : TQueuedBuffers;

FStream : TStream;

FSyncObj : TtdProduceConsumeSync;

protected

procedure Execute; override;

public

constructor Create(aStream : TStream;

aSyncObj : TtdProduceConsumeSync;

aBuffers : TQueuedBuffers);

end;

constructor TConsumer.Create(aStream : TStream;

aSyncObj : TtdProduceConsumeSync;

aBuffers : TQueuedBuffers);

begin

inherited Create (true);

FStream := aStream;

FSyncObj := aSyncObj;

FBuffers := aBuffers;

end;

procedure TConsumer.Execute;

var

Head : PBuffer;

begin

{сигнализировать о готовности к началу потребления данных}

FSyncObj.StartConsuming;

{извлечь начальный буфер}

Head := FBuffers.Head;

{до тех пор, пока начальный буфер не опустошен...}

while (Head^.bCount <> 0) do

begin

{выполнить запись блока из начального буфера в поток}

FStream.Write(Head^.bBlock, Head^.bCount);

{переместить указатель начала очереди}

FBuffers.AdvanceHead;

{поскольку было выполнено считывание и обработка буфера, необходимо сообщить о том, что данные были использованы}

FSyncObj.StopConsuming;

{сигнализировать о готовности снова приступить к потреблению данных}

FSyncObj.StartConsuming;

{извлечь начальный буфер}

Head := FBuffers.Head;

end;

end;

И, наконец, мы можем рассмотреть подпрограмму копирования потока, приведенную в листинге 12.14. Она принимает два параметра: входной поток и выходной поток. Подпрограмма создает специальный объект типа TQueuedBuffers. Этот объект содержит все ресурсы и методы, необходимые для реализации организованного в виде очереди набора буферов. Он создает также экземпляр класса TtdProducerConsumerSync, который будет действовать в качестве объекта синхронизации, обеспечивающего согласованную работу производителя и потребителя.

Листинг 12.14. Многопоточное копирование

procedure ThreadedCopyStream(aSrcStream, aDestStream : TStream);

var

SyncObj : TtdProduceConsumeSync;

Buffers : TQueuedBuffers;

Producer : TProducer;

Consumer : TConsumer;

WaitArray : array [ 0..1] of THandle;

begin

SyncObj := nil;

Buffers := nil;

Producer :=nil;

Consumer :=nil;

try

{создать объект синхронизации, объект организованных в виде очереди буферов (с 20 буферами) и два потока}

SyncObj := TtdProduceConsumeSync.Create(20);

Buffers := TQueuedBuffers.Create(20);

Producer := TProducer.Create(aSrcStream, SyncObj, Buffers);

Consumer := TConsumer.Create(aDestStream, SyncObj, Buffers);

{сохранить дескрипторы потоков, что обеспечивает возможность ожидания их передачи}

WaitArray[0] := Producer.Handle;

WaitArray[1] := Consumer.Handle;

{запустить потоки}

Consumer.Resume;

Producer.Resume;

{ожидать окончания потоков}

WaitForMultipleObjects(2, @WaitArray, true, INFINITE);

finally

Producer.Free;

Consumer.Free;

Buffers.Free;

SyncObj.Free;

end;

end;

Затем подпрограмма копирования создает два потока, между которыми будет выполняться копирование, и возобновляет их выполнение (потоки создаются в приостановленном состоянии). Далее подпрограмма дожидается завершения обоих потоков и выполняет очистку. Полный код подпрограммы можно найти в файлах TstCopy.dpr и TstCopyu.pas на web-сайте издательства, в разделе материалов.

Поделитесь на страничке

Следующая глава >

Похожие главы из других книг:

1.7. Модель OSI

Из книги автора

1.7. Модель OSI Распространенным способом описания уровней сети является предложенная Международной организацией по стандартизации (International Standards Organization, ISO) модель взаимодействия открытых систем (open systems interconnection, OSI). Эта семиуровневая модель показана на рис. 1.5, где она


Модель ISO/OSI

Из книги автора

Модель ISO/OSI Пожалуй, ключевым понятием в стандартизации сетей и всего, что к ним относится, является модель взаимодействия открытых систем (Open System Interconnection, OSI), разработанная «Международной организацией по стандартизации» (International Standards Organization, ISO). На практике применяется


8.16.1 Модель EGP

Из книги автора

8.16.1 Модель EGP Маршрутизатор EGP конфигурируется с адресом IP для одного или нескольких внешних соседних маршрутизаторов. Обычно внешние соседи соединены с общей сетью с множественным доступом или объединены одной линией "точка-точка".EGP позволяет маршрутизатору


14.3 Модель FTP

Из книги автора

14.3 Модель FTP Как видно из приведенного выше диалога, пользователь взаимодействует с локальным клиентом FTP (точнее, с соответствующим процессом). Программное обеспечение локального клиента управляет преобразованием данных для удаленного сервера FTP через управляющее


15.2 Модель RPC

Из книги автора

15.2 Модель RPC Приложение клиент/сервер для архитектуры ONC функционирует поверх RPC. Работа RPC моделируется обычными вызовами подпрограмм. Например, в языке программирования С вызов обычной подпрограммы в общем случае имеет форму:код_возврата = имя_процедуры


17.4 Модель новостей

Из книги автора

17.4 Модель новостей Клиентский процесс новостей взаимодействует с сервером сетевых новостей по протоколу пересылки сетевых новостей (Network News Transfer Protocol — NNTP). Клиентский процесс может размещаться в агенте новостей конечного пользователя или на сервере новостей того же


Как можно поговорить только с одним абонентом

Из книги автора

Как можно поговорить только с одним абонентом Суть проблемы заключается в следующем: в канале имеется несколько человек, и все они могут слышать друг друга, однако нам хочется поговорить только с одним из них. Какие здесь есть варианты?? Можно, конечно, послать ему


Вызов редактора Visual Basic одним щелчком

Из книги автора

Вызов редактора Visual Basic одним щелчком Если в вашем приложении есть кнопка в панели инструментов для вызова редактора Visual Basic. используйте се. В VBА-приложениях из пакета Microsoft Office (Word. Excel и PowerPoint) эта кнопка помещена в панель инструментов Visual Basic (рис. 3.2). Наборы кнопок в этой


11.6.6. Модель редактора ed

Из книги автора

11.6.6. Модель редактора ed Для всех предыдущих моделей характерна весьма низкая интерактивность. В них используется только управляющая информация, переданная во время запуска и обособленная от данных. Однако многим программам после запуска требуется управление с помощью


Модель

Из книги автора

Модель В любом интерьере наибольшее количество объектов — это модели. Модель может иметь произвольную форму: от примитивных сферы или куба до реалистичных форм человеческой фигуры. Модель призвана передавать формы конкретных объектов. Например, создавая интерьер, мы


Microsoft не удалось стать производителем телефонов Михаил Карпов

Из книги автора

Microsoft не удалось стать производителем телефонов Михаил Карпов Опубликовано 01 июля 2010 года Первая попытка стать производителем мобильных телефонов закончилась для Microsoft плачевно и очень быстро. Телефоны Microsoft Kin продавались в США с середины мая;


Очки Google: остановите киборгов или станьте одним из них Андрей Васильков

Из книги автора

Очки Google: остановите киборгов или станьте одним из них Андрей Васильков Опубликовано 20 марта 2013Очки дополненной реальности Google Glass вызывали опасения у многих ещё до того, как стали популярными. Страх возникает из-за мысли о том, что вскоре люди окажутся перед


Web-модель

Из книги автора

Web-модель Web-модель получила свое название, поскольку базируется на популярных браузерах Netscape Navigator и Microsoft Internet Explorer, используемых как средства навигации во Всемирной Паутине - World Wide Web. Эта модель предусматривает встраивание в готовый браузер набора открытых ключей