Multithread - Delphi - Потоки

Главная   1 глава   2 глава   3 глава   4 глава   5 глава   6 глава   7 глава   8 глава   9 глава  

Глава 9. Семафоры. Управление потоками данных. Взаимосвязь источник-приемник.

Содержание:


* Семафоры.


* Счетчик больше единицы? "Не вполне критические" секции.


* Новое применение семафоров: управление потоками данных.


* Ограниченный буфер.


* Реализация ограниченного буфера в Delphi.


* Создание: Корректная инициализация счетчиков семафоров.


* Работа: правильные времена ожидания.


* Разрушение: Очистка.


* Разрушение: тонкости остаются.


* Доступ к дескрипторам синхронизации должен быть синхронизован!


* Управление дескрипторами в Win32.


* Решение.


* Использование ограниченного буфера: пример.


* В завершение...

Семафоры.

Семафор представляет собой другой тип примитива синхронизации, с несколько более широкими возможностями по сравнению с мьютексом. В наиболее простых случаях его можно использовать точно так же, как и мьютекс. В общем же семафоры позволяют реализовать в программе более продвинутые механизмы синхронизации.

Сначала давайте вспомним, как работают мьютексы. Мьютекс может быть или занят или свободен (signalled). Если он свободен, то действие ожидания мьютекса не блокируется. Если он занят, операция ожидания этого мьютекса блокирована. Если мьютекс занят, то он принадлежит конкретному потоку, и, следовательно, только один поток может обладать мьютексом в каждый момент времени.

Семафоры можно заставить действовать точно так же. Вместо понятия владения, захвата мьютекса, у семафора имеется счетчик. Когда этот счетчик больше нуля, семафор свободен, и операции ожидания для него не блокируются. Когда счетчик равен 0, то семафор занят, и операции ожидания заблокированы. Мьютекс по существу является разновидностью семафора, счетчик которого может быть только 0 или 1. Аналогично, семафоры можно рассматривать как воображаемые мьютексы, которые могут одновременно иметь более одного владельца. Функции Win32 API, работающие с семафорами, очень похожи на функции для работы с мьютексами.


* CreateSemaphore. Эта функция подобна CreateMutex. Вместо флага, указывающего, что поток, создающий мьютекс, сразу им будет владеть, эта функция принимает аргумент, задающий начальный счетчик. Создание занятого мьютекса подобно созданию семафора со счетчиком 0: в обоих случаях любой другой поток, ожидающий освобождения объекта, будет заблокирован. Аналогично, создание свободного мьютекса подобно созданию семафора со счетчиком 1: в обоих случаях единственный поток не будет заблокирован в ожидании объекта синхронизации.


* Функции ожидания (Wait). Они для обоих случаев идентичны. Для мьютексов успешный результат ожидания приводит к тому, что поток захватывает мьютекс, а для семафоровуспешный результат ожидания уменьшает счетчик семафора, а если счеичик обнуляется, то вызывающий поток блокируется.


* ReleaseSemaphore. Это подобно ReleaseMutex, но вместо освобождения мьютекса потоком, ReleaseSemaphore принимает дополнительный целочисленный аргумент, определяющий, на какую величину увеличится счетчик. ReleaseSemaphore либо увеличивает счетчик семафора, либо активирует соответствующее число потоков, которые были блокированы эти семафором.

Следующая таблица показывает, как превратить код, использующий мьютексы, в код с применением семафоров, и эквивалентные операции.

Выделить всёБез подсветки

Мьютексы. Семафоры.

MyMutex := CreateMutex(nil,FALSE,); MySemaphore := CreateSemaphore(nil,1,1,);

MyMutex := CreateMutex(nil,TRUE,); MySemaphore := CreateSemaphore(nil,0,1,);

WaitForSingleObject(MyMutex,INFINITE); WaitForSingleObject(MySemaphore,INFINITE);

ReleaseMutex(MyMutex); ReleaseSemaphore(MySemaphore,1);

CloseHandle(MyMutex); CloseHandle(MySemaphore);

Вот простой пример: изменения, требующиеся для кода, представленного в 6 Главе, чтобы программа использовала семафоры вместо критических секций.

Выделить всёРазвернуть кодСвернуть кодкод Pascal/Delphi

type

TPrimeFrm = class(TForm)

{ No change here until public declarations }

public

{ Public declarations }

StringSemaphore: THandle; { Now a semaphore instead of a critical section }

property StringBuf: TStringList read FStringBuf write FStringBuf;

end;

procedure TPrimeFrm.StartBtnClick(Sender: TObject);

begin

if not FStringSectInit then

begin

StringSemaphore := CreateSemaphore(nil, 1, 1, SemName); { Now creating a semaphore instead of a critical section }

FStringBuf := TStringList.Create;

FStringSectInit := true;

FPrimeThread := TPrimeThrd2.Create(true);

SetThreadPriority(FPrimeThread.Handle, THREAD_PRIORITY_BELOW_NORMAL);

try

FPrimeThread.StartNum := StrToInt(StartNumEdit.Text);

except

on EConvertError do FPrimeThread.StartNum := 2;

end;

FPrimeThread.Resume;

end;

UpdateButtons;

end;

procedure TPrimeFrm.StopBtnClick(Sender: TObject);

begin

if FStringSectInit then

begin

with FPrimeThread do

begin

Terminate;

WaitFor;

Free;

end;

FPrimeThread := nil;

FStringBuf.Free;

FStringBuf := nil;

CloseHandle(StringSemaphore); { Deleting semaphore }

FStringSectInit := false;

end;

UpdateButtons;

end;

procedure TPrimeFrm.HandleNewData(var Message: TMessage);

begin

if FStringSectInit then {Not necessarily the case!}

begin

WaitForSingleObject(StringSemaphore, INFINITE); { New wait call }

ResultMemo.Lines.Add(FStringBuf.Strings[0]);

FStringBuf.Delete(0);

ReleaseSemaphore(StringSemaphore, 1, nil); { New release call }

{Now trim the Result Memo.}

if ResultMemo.Lines.Count > MaxMemoLines then

ResultMemo.Lines.Delete(0);

end;

end;

procedure TPrimeThrd2.Execute;

var

CurrentNum: integer;

begin

CurrentNum := FStartNum;

while not Terminated do

begin

if IsPrime(CurrentNum) then

begin

WaitForSingleObject(PrimeFrm.StringSemaphore, INFINITE); { New wait call }

PrimeFrm.StringBuf.Add(IntToStr(CurrentNum) + ' is prime.');

ReleaseSemaphore(PrimeFrm.StringSemaphore, 1, nil); { New release call }

PostMessage(PrimeFrm.Handle, WM_DATA_IN_BUF, 0, 0);

end;

Inc(CurrentNum);

end;

end;

Счетчик больше единицы? "Не вполне критические" секции.

Возможность семафоров иметь счетчик более единицы отчасти аналогична разрешению мьютексу иметь более одного владельца. Таким образом семафоры позволяют создавать критические секции, который разрешают определенному количеству потоков иметь доступ к одному конкретному участку кода или к конкретному объекту. Это по большей части полезно в ситуациях, когда коллективный ресурс состоит из множества буферов или множества потоков, которые можно использовать и другими потоками системы. Давайте рассмотрим конкретный пример и предположим, что до трех потоков могут работать с определенным участком кода. Семафор создается с начальным и максимальным счетчиком 3, и предположим, что ни один поток не работает с критическим участком. Выполнение пяти потоков, пытающихся получить доступ к коллективному ресурсу, может выглядеть приблизительно так:

--Resize_Images_Alt_Text--

Это конкретное применение семафоров, вероятно, не особенно полезно для программистов на Delphi, главным образом потому, что есть несколько подобных статических структур для уровня приложения. Тем не менее, оно оказывается значительно более важным для ОС, где дескрипторы или ресурсы, такие как системные буферы, вероятно будут статически распределены во время загрузки.

Новое применение семафоров: управление потоками данных.

В Главе 6 было указано на потребность в управлении потоками данных при их прохождении между программными потоками . Кроме того, в Главе 8 эта тема была затронута при обсуждении мониторов. В данной главе рассматривается ситуация для примера, где часто требуется управление потоками данных: ограниченный буфер с единственным потоком-поставщиком данных, выводящий некоторые элементы в буфер, и единственный поток-потребитель, забирающий их оттуда.

Ограниченный буфер.

Ограниченный буфер представляет собой простую разделяемую структуру данных, которая обеспечивает и управление потоками данных, и общий доступ к данным. Буфер, рассмотренный здесь, будет простой очередью: первым вошел - первым вышел (FIFO). . Это будет реализовано в виде циклического буфера, то есть содержать фиксированное количество элементов и иметь два указателя "get" и "put", показывающие, в каком именно месте буфера данные будут вставлены и удалены. Обычно разрешается четыре операции с буфером:


* Create Buffer. Создаются и инициализируются буфер и связанные с ним механизмы синхронизации.


* Put Item. Попытка вставить элемент в буфер с учетом потокобезопасности. Если это невозможно из-за заполнения буфера, то поток, пытающийся вставить элемент, блокируется (приостанавливается), пока буфер не перейдет в состояние, в котором в него разрешено добавлять данные.


* Get Item. Попытка забрать элемент из буфера с учетом потокобезопасности. Если это невозможно из-за того, что буфер пуст, то поток, пытающийся получить элемент, блокируется, пока буфер не перейдет в состояние, в котором из него разрешено удалять данные..


* Destroy Buffer. Разблокирует все потоки, ожидающие буфера, разрушает буфер.

Очевидно, для обращения с коллективными данными потребуются мьютексы. Тем не менее, мы можем использовать семафоры для выполнения необходимых операции блокировки, когда буфер полон или пуст, устраняя потребность в контроле выхода за границы, и даже подсчете того, сколько элементов находится в буфере. Для того, чтобы это сделать, потребуется некоторое изменение концепций. Вместо ожидания семафора и затем освобождения его при выполнении операций, имеющих отношение к буферу, мы используем счетчик двух семафоров, чтобы следить за тем, сколько входов в буфере пусты или заполнены. Давайте назовем эти семафоры "EntriesFree" и "EntriesUsed".

Обычно с буфером взаимодействуют два потока. Поток-производитель (или писатель) пытается вставить данные в буфер, а поток-потребитель (читатель) пытается их извлечь, как показано на следующей диаграмме. Третий поток (возможно, поток VCL), может вмешиваться для того, чтобы создавать и уничтожать буфер.

--Resize_Images_Alt_Text--

Как можно видеть, потоки - читатель и писатель - выполняются в цикле. Поток-писатель создает элемент и пытается поместить его в буфер. Сначала поток ожидает семафора EntriesFree. Если счетчик EntriesFree нулевой, то поток будет заблокирован, так как буфер полный, и больше данных добавить нельзя. Как только это возможное ожидание закончится, поток добавляет элемент в буфер, а затем увеличивает счетчик EntriesUsed, и, если необходимо, активирует поток-потребитель. Соответственно поток-потребитель заблокируется, если счетчик EntriesUsed нулевой, а когда он удаляет элемент, то увеличивает счетчик EntriesFree, разрешая потоку-производителю добавлять новый элемент.

Блокировка нужного потока всякий раз, когда буфер становится полным или пустым, оставляет один или другой поток "вне игры". При данном размере буфера N, поток-производитель может только быть на N элементов впереди потока-потребителя до своей остановки, и аналогично, поток-потребитель не может отставать более, чем на N элементов. Это дает несколько преимуществ:


* Один поток не может выдать лишние данные, таким образом мы избавляемся от проблем, указанных в Главе 6, где у нас один поток создавал очередь все увеличивающегося размера.


* Буфер имеет конечный размер, в противоположность списку, рассмотренному ранее, так что мы можем предусмотреть наихудший вариант использования памяти.


* Здесь не будет "ожидания при занятости". Когда потоку нечего делать, он приостанавливается. При этом не возникает ситуации, когда программист пишет маленькие циклы, которые ничего не делают, кроме ожидания новых данных при снятии блокировки. Этого следует избегать, так как впустую тратится процессорное время.

Для полной ясности я дам пример последовательности событий. У нас есть буфер, который имеет 4 возможных входа, и инициализируется он так, что все элементы свободны. Возможно много путей выполнения в зависимости от работы планировщика, но я проиллюстрирую тот, где каждый поток выполняется столько, сколько возможно, прежде чем он будет приостановлен.

Выделить всёБез подсветки

Действия потока-читателя Действия потока-писателя Число Число

свободных занятых

элементов элементов

Thread starts Thread inactive (not scheduled) 4 0

Wait(EntriesUsed) blocks. Suspended. 4 0

Wait(EntriesFree) flows through 3 0

Item Added. Signal(EntriesUsed) 3 1

Wait(EntriesFree) flows through 2 1

Item Added. Signal(EntriesUsed) 2 2

Wait(EntriesFree) flows through 1 2

Item Added. Signal(EntriesUsed) 1 3

Wait(EntriesFree) flows through 0 3

Item Added. Signal(EntriesUsed) 0 4

Wait(EntriesFree) blocks. Suspended 0 4

Wait(EntriesUsed) completes 0 3

Item Removed. Signal(EntriesFree) 1 3

Wait(EntriesUsed) flows through 1 2

Item Removed. Signal(EntriesFree) 2 2

Wait(EntriesUsed) flows through 2 1

Item Removed. Signal(EntriesFree) 3 1

Wait(EntriesUsed) flows through 3 0

Item Removed. Signal(EntriesFree) 4 0

Wait(EntriesUsed) blocks. Suspended 4 0

Реализация ограниченного буфера в Delphi.

Вот первая реализация ограниченного буфера на Delphi.

Выделить всёРазвернуть кодСвернуть кодкод Pascal/Delphi

unit BoundedBuf;

{Martin Harvey 24/4/2000}

interface

uses Windows, SysUtils;

const

DefaultWaitTime = 5000; { Five second wait on mutexes }

type

{ I don't particularly like dynamic arrays, so I'm going to do things

the "C" way here, explicitly allocating memory

Think of TBufferEntries as ^(array of pointer) }

TBufferEntries = ^Pointer;

TBoundedBuffer = class

private

FBufInit: boolean;

FBufSize: integer;

FBuf: TBufferEntries;

FReadPtr, { ReadPtr points to next used entry in buffer}

FWritePtr: integer; { WritePtr points to next free entry in buffer}

FEntriesFree, FEntriesUsed: THandle; { Flow control semaphores }

FCriticalMutex: THandle; { Critical section mutex }

protected

procedure SetSize(NewSize: integer);

public

procedure ResetState;

destructor Destroy; override;

function PutItem(NewItem: Pointer): boolean;

function GetItem: Pointer;

published

property Size: integer read FBufSize write SetSize;

end;

{ No constructor required because default values of 0, false etc acceptable }

implementation

const

FailMsg1 = 'Flow control failed, or buffer not initialised';

FailMsg2 = 'Critical section failed, or buffer not initialised';

procedure TBoundedBuffer.SetSize(NewSize: integer);

{ Initialises handles and allocates memory.

If the buffer size has previously been set, then this may invoke a buffer

reset }

begin

if FBufInit then ResetState;

if NewSize < 2 then NewSize := 2;

FBufSize := NewSize;

GetMem(FBuf, Sizeof(Pointer)
* FBufSize);

FillMemory(FBuf, Sizeof(Pointer)
* FBufSize, 0);

FBufInit := true;

FCriticalMutex := CreateMutex(nil, false, nil); { note lack of name }

{ The initial count on the semaphores requires some thought,

The maximum count requires more thought.

Again, all synchronisation objects are anonymous }

FEntriesFree := CreateSemaphore(nil, FBufSize - 1, FBufSize, nil);

FEntriesUsed := CreateSemaphore(nil, 0, FBufSize, nil);

if (FCriticalMutex = 0)

or (FEntriesFree = 0)

or (FEntriesUsed = 0) then ResetState

end;

procedure TBoundedBuffer.ResetState;

{ Closes handles and deallocates memory.

Note that this must unblock threads in such a manner that they quit cleanly }

begin

if FBufInit then

begin

WaitForSingleObject(FCriticalMutex, DefaultWaitTime);

FBufInit := false;

FBufSize := 0;

FreeMem(FBuf);

{ Now wake up all threads currently waiting.

Currently assumes only 1 producer and 1 consumer.

Plenty of ordering subtleties and pitfalls to be discussed here }

ReleaseSemaphore(FEntriesFree, 1, nil);

ReleaseSemaphore(FEntriesUsed, 1, nil);

CloseHandle(FEntriesFree);

CloseHandle(FEntriesUsed);

{ If reader or writer threads are waiting,

then they will be waiting on the mutex.

We will close the handle and let them time out }

CloseHandle(FCriticalMutex);

end;

end;

function TBoundedBuffer.PutItem(NewItem: Pointer): boolean;

{ Called by producer thread }

var

NthItem: TBufferEntries;

begin

result := false;

{ WAIT(EntriesFree) }

if WaitForSingleObject(FEntriesFree, INFINITE) <> WAIT_OBJECT_0 then

exit;

if (WaitForSingleObject(FCriticalMutex, DefaultWaitTime) <> WAIT_OBJECT_0)

or not FBufInit then

exit;

NthItem := FBuf;

Inc(NthItem, FWritePtr);

NthItem^ := NewItem;

FWritePtr := (FWritePtr + 1) mod FBufSize;

ReleaseMutex(FCriticalMutex);

{ SIGNAL(EntriesUsed) }

ReleaseSemaphore(FEntriesUsed, 1, nil);

result := true;

end;

function TBoundedBuffer.GetItem: Pointer;

{ Called by consumer thread }

var

NthItem: TBufferEntries;

begin

result := nil;

{ WAIT(EntriesUsed) }

if WaitForSingleObject(FEntriesUsed, INFINITE) <> WAIT_OBJECT_0 then

exit;

if (WaitForSingleObject(FCriticalMutex, DefaultWaitTime) <> WAIT_OBJECT_0)

or not FBufInit then

exit;

NthItem := FBuf;

Inc(NthItem, FReadPtr);

Result := NthItem^;

FReadPtr := (FReadPtr + 1) mod FBufSize;

ReleaseMutex(FCriticalMutex);

{ SIGNAL(EntriesFree) }

ReleaseSemaphore(FEntriesFree, 1, nil);

end;

destructor TBoundedBuffer.Destroy;

begin

ResetState;

inherited Destroy;

end;

end.

Как обычно, появляется несколько вопросов, на которые следует обратить внимание, и несколько проблем, которые будут решены позже.


* Какие значения следует передавать при вызовах создания семафоров?


* Сколько нужно ожидать освобождения мьютексов и критических секций?


* Сколько следует ожидать освобождения семафоров?


* Какой наилучший метод разрушения буфера?

Создание: Корректная инициализация счетчиков семафоров.

При такой реализации ограниченного буфера данные хранятся как массив указателей с индексами чтения и записи в этот массив. В целях отладки я сделал так, что если буфер содержит N элементов, то он будет объявлен полным, когда заполнено N-1элементов. Такая задача чаще всего решается с помощью циклического буфера, где индексы чтения и записи сравнивают для определения, полон буфер или нет. Если буфер пуст, индексы чтения и записи одинаковы. К несчастью, то же самое будет и для случая, если буфер совершенно заполнен, так что часто в коде циклического буфера делают один всегда пустой вход, что позволяет различить эти два условия. В нашем случае, поскольку мы используем семафоры, это не обязательно. Тем не менее, я решил соблюсти это соглашение для облегчения отладки.

Учитывая это, мы можем инициализировать семафор EntriesUsed нулем. Поскольку заполненных элементов нет, мы хотим, чтобы потоки-читатели сразу же были блокированы. По условию, мы хотим, чтобы потоки-писатели добавили в буферN-1 элементов, и поэтому инициализируем EntriesFree значением N-1.

Нам также нужно учитывать максимальный счетчик, разрешенный для семафоров. Процедура, которая уничтожает буфер, всегда выполняет действие SIGNAL (свободны) для обоих семафоров. Поэтому, когда буфер разрушается, в нем может находиться любое количество элементов, т.е. он может быть и полностью заполнен и совершенно пуст, и мы установили максимальный счетчик в N, допуская таким образом одно действие освобождения семафоров для всех возможных состояний буфера.

Работа: правильные времена ожидания.

Я использовал мьютексы вместо критических секций в этой части программы, поскольку они позволяют разработчику более точно контролировать ошибочные ситуации. Кроме того, они также допускают выход по таймауту. Время ожидания для wait-функций семафоров в самом деле должно быть бесконечным; возможно, что буфер остается полным или пустым в течение длинных периодов времени, и нам нужно заблокировать поток на столько, сколько буфер будет пуст. Параноидально настроенные или пишущие небезопасный код программисты могут использовать задержки на нескольких секунд для этих примитивов, чтобы учесть непредвиденные ошибочные ситуации, когда поток блокируется навсегда. Я достаточно уверен в своем коде, чтобы считать это необязательным, по крайней мере в данном случае...

Задержка для мьютекса - совсем другое дело. Операции в критической секции быстрые; до N записей в память, и, если обеспечить сравнительное небольшое N (то есть меньше миллиона), то эти действия не должны занять более 5 секунд. В качестве бесплатного приложения часть кода очистки захватывает этот мьютекс, а вместо освобождения его - закрывает дескриптор. При установке таймаута гарантируется, что потоки, ожидающие мьютекс, будут разблокированы, и вернут код ошибки.

Разрушение: Очистка.

К настоящему моменту большинство читателей уже понимают, что операции очистки часто являются наиболее трудными в многопоточном программировании. Ограниченный буфер не является исключением. Процедура ResetState выполняет очистку. Первое, что она делает - проверяет FBufInit. Я предположил, что при этом не требуется синхронизировать доступ, поскольку поток, который создает буфер, должен также и уничтожить его, а раз все делается одним потоком, и все операции записи происходят в критической секции (по крайней мере после создания), то никаких конфликтов не произойдет. Процедуре очистки теперь нужно проверить, что все правильно уничтожено, и что все потоки, находящиеся в ожидании, в процессе чтения или записи, завершаются корректно, сообщая в противном случае о неудаче.

Процедура очистки сначала захватывает мьютекс для разделяемых данных в буфере, затем разблокирует потоки чтения и записи, освобождая оба семафора. Операции производятся в этом порядке, поскольку, когда оба семафора свободны, состояние буфера больше не отражает истинного положения дел: счетчики семафоров не согласованы с содержимым буфера. Захватывая сначала мьютекс, мы можем уничтожить буфер раньше того, как разблокированные потоки приступят к его чтению. Уничтожая буфер, и устанавливая FBufInit в False, мы гарантируем, что разблокированные потоки вернут код ошибки, а не будут обращаться к неправильным данным.

Затем мы разблокируем оба потока, освободив оба семафора, а потом закрываем все дескрипторы синхронизации. После этого уничтожаем мьютекс, не освобождая его. Это не страшно, поскольку все действия ожидания мьютекса закончились, то мы можем быть уверены, что как поток чтения, так и поток записи в конечном счете разблокируются. Кроме того, так как имеется только по одному потоку - читателю и писателю, мы можем гарантировать, что никакие другие потоки в течение этого процесса не станут ожидать освобождения семафоров. Это означает, что одного действия по освобождению обоих семафоров было достаточно, чтобы возобновить все потоки, а поскольку мы уничтожаем дескрипторы семафоров, пока удерживаем мьютекс, то дальнейшие действия по записи или чтению обречены на неудачу, если при них будет попытка обращения к одному из семафоров.

Разрушение: тонкости остаются.

Этот код гарантированно работает только с одним потоком чтения, с одним потоком записи и одним управляющим. Почему?

Если существует более одного потока чтения или записи, то более, чем один поток может ожидать один из семафоров в любом момент времени. Следовательно, мы не могли бы активировать все ожидающие потоки-читатели или писатели, когда состояние буфера сброшено. Первой реакцией программиста на это может быть модификация подпрограммы очистки, чтобы продолжалось освобождение одного или другого семафора, пока все потоки не будут разблокированы, что можно сделать приблизительно так

Выделить всёРазвернуть кодСвернуть кодкод Pascal/Delphi

procedure TBoundedBuffer.ResetState;

{ Closes handles and deallocates memory.

Note that this must unblock threads in such a manner that they quit cleanly }

var

SemCount: integer;

begin

if FBufInit then

begin

WaitForSingleObject(FCriticalMutex, DefaultWaitTime);

FBufInit := false;

FBufSize := 0;

FreeMem(FBuf);

repeat

ReleaseSemaphore(FEntriesFree, 1, @SemCount);

until SemCount = 0;

repeat

ReleaseSemaphore(FEntriesUsed, 1, @SemCount);

until SemCount = 0;

CloseHandle(FEntriesFree);

CloseHandle(FEntriesUsed);

CloseHandle(FCriticalMutex);

end;

end;

К несчастью, этого все еще недостаточно, поскольку один из повторяющихся циклов в процедуре очистки может завершиться прямо перед тем, как другой поток приступит к действиям чтения или записи и станет ожидать семафор. Очевидно, нам нужно нечто атомарное, но мы не можем ввести действия с семафором в критическую секцию, поскольку потоки, блокирующие семафор, останутся в критической секции, и вся потоки придут к тупику.

Доступ к дескрипторам синхронизации должен быть синхронизован!

Следующая возможность - обнулить дескриптор семафора прямо перед "отключением" его, сделав нечто подобное

Выделить всёРазвернуть кодСвернуть кодкод Pascal/Delphi

procedure TBoundedBuffer.ResetState;

{ Closes handles and deallocates memory.

Note that this must unblock threads in such a manner that they quit cleanly }

var

SemCount: integer;

LocalHandle: THandle;

begin

if FBufInit then

begin

WaitForSingleObject(FCriticalMutex, DefaultWaitTime);

FBufInit := false;

FBufSize := 0;

FreeMem(FBuf);

LocalHandle := FEntriesFree;

FEntriesFree := 0;

repeat

ReleaseSemaphore(LocalHandle, 1, @SemCount);

until SemCount = 0;

CloseHandle(LocalHandle);

LocalHandle := FEntriesUsed;

FEntriesUsed := 0;

repeat

ReleaseSemaphore(LocalHandle, 1, @SemCount);

until SemCount = 0;

CloseHandle(LocalHandle);

CloseHandle(FCriticalMutex);

end;

end;

(Автор будет честен и признает, что это жалкое неполноценное решение пришло ему в голову). Однако это ничем не лучше. Вместо проблемы тупика мы получим конфликт потоков непростого типа. Этот конфликт представляет собой запись после чтения для самого дескриптора семафора! Да... Вы должны синхронизировать даже ваши объекты синхронизации! Вот что может случиться: рабочий поток читает значение дескриптора мьютекса из буферного объекта и приостанавливается, ожидая; в этот момент поток очистки, уничтожающий буфер, освобождает мьютекс необходимое число раз, и именно в этот момент рабочий поток возобновляется и обращается к мьютексу, который, как мы считаем, только что был уничтожен! Интервал, в котором это может случиться, очень небольшой, но тем не менее, это решение неприемлемо.

Управление дескрипторами в Win32.

Эта проблема достаточно сложна, так что имеет смысл рассмотреть что именно происходит, когда мы запускаем функцию Win32 для закрытия мьютекса или семафора. В частности, полезное знать вот что:


* Разблокирует ли закрытие дескриптора потоки, ожидающие данный мьютекс или семафор?


* В случае мьютексов, есть ли разница, кто владеет дескриптором при освобождении мьютекса?

Чтобы узнать это, мы можем использовать два текстовых приложения, для мьютекса

Выделить всёРазвернуть кодСвернуть кодкод Pascal/Delphi

unit HandleForm;

interface

uses

Windows, Messages, SysUtils, Classes, Graphics, Controls, Forms, Dialogs,

StdCtrls;

type

THandleFrm = class(TForm)

CreateBtn: TButton;

CloseOwnerBtn: TButton;

CloseNonOwnerBtn: TButton;

procedure CreateBtnClick(Sender: TObject);

procedure CloseOwnerBtnClick(Sender: TObject);

procedure CloseNonOwnerBtnClick(Sender: TObject);

private

{ Private declarations }

public

{ Public declarations }

Mutex: THandle;

end;

var

HandleFrm: THandleFrm;

implementation

uses HandleThreads;

{$R *.DFM}

procedure THandleFrm.CreateBtnClick(Sender: TObject);

var

NewThread: THandleThread;

begin

Mutex := CreateMutex(nil, false, nil);

WaitForSingleObject(Mutex, INFINITE);

NewThread := THandleThread.Create(false);

NewThread := THandleThread.Create(false);

ShowMessage('Threads Created.');

end;

procedure THandleFrm.CloseOwnerBtnClick(Sender: TObject);

begin

CloseHandle(Mutex);

end;

procedure THandleFrm.CloseNonOwnerBtnClick(Sender: TObject);

begin

ReleaseMutex(Mutex);

CloseHandle(Mutex);

end;

end.

unit HandleThreads;

interface

uses

Classes, Windows, SysUtils, Dialogs;

type

THandleThread = class(TThread)

private

{ Private declarations }

protected

procedure Execute; override;

end;

implementation

uses HandleForm;

procedure THandleThread.Execute;

var

RetVal: integer;

begin

RetVal := WaitForSingleObject(HandleFrm.Mutex, INFINITE);

case RetVal of

WAIT_OBJECT_0: ShowMessage('Unblocked: WAIT_OBJECT_0');

WAIT_ABANDONED: ShowMessage('Unblocked: WAIT_ABANDONED');

WAIT_TIMEOUT: ShowMessage('Unblocked: WAIT_TIMEOUT');

else

ShowMessage('Unblocked. Unknown return code.');

end;

end;

end.

и для семафора

Выделить всёРазвернуть кодСвернуть кодкод Pascal/Delphi

unit HandleForm;

interface

uses

Windows, Messages, SysUtils, Classes, Graphics, Controls, Forms, Dialogs,

StdCtrls;

type

THandleFrm = class(TForm)

CreateBtn: TButton;

CloseOwnerBtn: TButton;

CloseNonOwnerBtn: TButton;

RelBtn: TButton;

procedure CreateBtnClick(Sender: TObject);

procedure CloseOwnerBtnClick(Sender: TObject);

procedure CloseNonOwnerBtnClick(Sender: TObject);

procedure RelBtnClick(Sender: TObject);

private

{ Private declarations }

public

{ Public declarations }

Semaphore: THandle;

end;

var

HandleFrm: THandleFrm;

implementation

uses HandleThreads;

{$R *.DFM}

procedure THandleFrm.CreateBtnClick(Sender: TObject);

begin

Semaphore := CreateSemaphore(nil, 1, 1, nil);

WaitForSingleObject(Semaphore, INFINITE);

THandleThread.Create(false);

THandleThread.Create(false);

ShowMessage('Threads Created.');

end;

procedure THandleFrm.CloseOwnerBtnClick(Sender: TObject);

begin

CloseHandle(Semaphore);

end;

procedure THandleFrm.CloseNonOwnerBtnClick(Sender: TObject);

begin

ReleaseSemaphore(Semaphore, 1, nil);

CloseHandle(Semaphore);

end;

procedure THandleFrm.RelBtnClick(Sender: TObject);

begin

ReleaseSemaphore(Semaphore, 1, nil);

end;

end.

unit HandleThreads;

interface

uses

Classes, Windows, SysUtils, Dialogs;

type

THandleThread = class(TThread)

private

{ Private declarations }

protected

procedure Execute; override;

end;

implementation

uses HandleForm;

procedure THandleThread.Execute;

var

RetVal: integer;

begin

RetVal := WaitForSingleObject(HandleFrm.Semaphore, 10000);

case RetVal of

WAIT_OBJECT_0: ShowMessage('Unblocked: WAIT_OBJECT_0');

WAIT_ABANDONED: ShowMessage('Unblocked: WAIT_ABANDONED');

WAIT_TIMEOUT: ShowMessage('Unblocked: WAIT_TIMEOUT');

else

ShowMessage('Unblocked. Unknown return code.');

end;

end;

end.

С помощью этих программ можно определить, что при закрытии дескриптора объекта синхронизации Win32 не разблокирует потоки, ожидающие этот объект. Это, наиболее вероятно, происходит благодаря механизму подсчета ссылок, который Win32 использует, чтобы следить за дескрипторами: потоки, ожидающие объект синхронизации, могут поддерживать внутренний счетчик ссылок так, чтобы он не обнулялся, и закрывая дескриптор объекта для приложения, мы только лишаемся всякого управления этим объектом синхронизации. В нашей ситуации это серьезная проблема. В идеале при очистке хотелось бы надеяться, что попытка ожидания для закрытого дескриптора должна разблокировать потоки, ждущие освобождения данного объекта синхронизации через этот конкретный дескриптор. Это бы позволило программисту войти в критическую секцию, очистить данные в этой критической секции, затем закрыть дескриптор, таким образом разблокировав потоки, ожидающие данный объект с неким значением ошибки (возможно, WAIT_ABANDONED? (ждать, пока не исчезнет)).







© Copyright by Tregert, 2010
Сайт управляется системой uCoz