unit BoundedBuf;
{Martin Harvey 24/4/2000}
interface
uses Windows, SysUtils;
const
DefaultWaitTime = 1000; { One second wait on all synchronisation primitives }
type
{ I don't particularly like dynamic arrays, so I'm going to do things
the "C" way here, explicitly allocating memory
Think of TBufferEntries as ^(array of pointer) }
TBufferEntries = ^Pointer;
TBoundedBuffer = class
private
FBufInit: boolean;
FBufSize: integer;
FBuf: TBufferEntries;
FReadPtr, { ReadPtr points to next used entry in buffer}
FWritePtr: integer; { WritePtr points to next free entry in buffer}
FEntriesFree, FEntriesUsed: THandle; { Flow control semaphores }
FCriticalMutex: THandle; { Critical section mutex }
protected
procedure SetSize(NewSize: integer);
function ControlledWait(Semaphore: THandle): boolean;
{ Returns whether wait returned OK, or an error occurred }
public
procedure ResetState;
destructor Destroy; override;
function PutItem(NewItem: Pointer): boolean;
function GetItem: Pointer;
published
property Size: integer read FBufSize write SetSize;
end;
{ No constructor required because default values of 0, false etc acceptable }
implementation
procedure TBoundedBuffer.SetSize(NewSize: integer);
{ Initialises handles and allocates memory.
If the buffer size has previously been set, then this may invoke a buffer
reset }
begin
if FBufInit then ResetState;
if NewSize < 2 then NewSize := 2;
FBufSize := NewSize;
GetMem(FBuf, Sizeof(Pointer) * FBufSize);
FillMemory(FBuf, Sizeof(Pointer) * FBufSize, 0);
FBufInit := true;
FCriticalMutex := CreateMutex(nil, false, nil); { note lack of name }
{ The initial count on the semaphores requires some thought,
The maximum count requires more thought.
Again, all synchronisation objects are anonymous }
FEntriesFree := CreateSemaphore(nil, FBufSize - 1, FBufSize, nil);
FEntriesUsed := CreateSemaphore(nil, 0, FBufSize, nil);
if (FCriticalMutex = 0)
or (FEntriesFree = 0)
or (FEntriesUsed = 0) then ResetState
end;
procedure TBoundedBuffer.ResetState;
{ Closes handles and deallocates memory.
Note that this must unblock threads in such a manner that they quit cleanly }
begin
if FBufInit then
begin
WaitForSingleObject(FCriticalMutex, DefaultWaitTime);
FBufInit := false;
FBufSize := 0;
FreeMem(FBuf);
ReleaseSemaphore(FEntriesUsed, 1, nil);
ReleaseSemaphore(FEntriesFree, 1, nil);
CloseHandle(FEntriesFree);
CloseHandle(FEntriesUsed);
ReleaseMutex(FCriticalMutex);
CloseHandle(FCriticalMutex);
end;
end;
function TBoundedBuffer.ControlledWait(Semaphore: THandle): boolean;
var
ErrCode: integer;
begin
repeat
ErrCode := WaitForSingleObject(Semaphore, DefaultWaitTime);
if (ErrCode = WAIT_OBJECT_0) or (ErrCode = WAIT_ABANDONED) then
begin
{ If wait abandoned, return failure. Buffer not properly cleaned up }
result := ErrCode = WAIT_OBJECT_0;
exit;
end;
{ Wait timed out. Check whether buffer state initialised }
if WaitForSingleObject(FCriticalMutex, DefaultWaitTime) <> WAIT_OBJECT_0 then
begin
result := false;
exit;
end
else
begin
result := FBufInit;
ReleaseMutex(FCriticalMutex);
end;
until not Result;
end;
function TBoundedBuffer.PutItem(NewItem: Pointer): boolean;
{ Called by producer thread }
var
NthItem: TBufferEntries;
begin
result := false;
{ WAIT(EntriesFree) }
if not ControlledWait(FEntriesFree) then
exit;
if (WaitForSingleObject(FCriticalMutex, DefaultWaitTime) <> WAIT_OBJECT_0)
or not FBufInit then { NB.This condition depends on L -> R lazy evaluation }
exit;
NthItem := FBuf;
Inc(NthItem, FWritePtr);
NthItem^ := NewItem;
FWritePtr := (FWritePtr + 1) mod FBufSize;
ReleaseMutex(FCriticalMutex);
{ SIGNAL(EntriesUsed) }
ReleaseSemaphore(FEntriesUsed, 1, nil);
result := true;
end;
function TBoundedBuffer.GetItem: Pointer;
{ Called by consumer thread }
var
NthItem: TBufferEntries;
begin
result := nil;
{ WAIT(EntriesUsed) }
if not ControlledWait(FEntriesUsed) then
exit;
if (WaitForSingleObject(FCriticalMutex, DefaultWaitTime) <> WAIT_OBJECT_0)
or not FBufInit then { NB.This condition depends on L -> R lazy evaluation }
exit;
NthItem := FBuf;
Inc(NthItem, FReadPtr);
Result := NthItem^;
FReadPtr := (FReadPtr + 1) mod FBufSize;
ReleaseMutex(FCriticalMutex);
{ SIGNAL(EntriesFree) }
ReleaseSemaphore(FEntriesFree, 1, nil);
end;
destructor TBoundedBuffer.Destroy;
begin
ResetState;
inherited Destroy;
end;
end.