unit BoundedBuf;
{Martin Harvey 24/4/2000}
interface
uses Windows, SysUtils;
const
DefaultWaitTime = 5000; { Five second wait on mutexes }
type
{ I don't particularly like dynamic arrays, so I'm going to do things
the "C" way here, explicitly allocating memory
Think of TBufferEntries as ^(array of pointer) }
TBufferEntries = ^Pointer;
TBoundedBuffer = class
private
FBufInit: boolean;
FBufSize: integer;
FBuf: TBufferEntries;
FReadPtr, { ReadPtr points to next used entry in buffer}
FWritePtr: integer; { WritePtr points to next free entry in buffer}
FEntriesFree, FEntriesUsed: THandle; { Flow control semaphores }
FCriticalMutex: THandle; { Critical section mutex }
protected
procedure SetSize(NewSize: integer);
public
procedure ResetState;
destructor Destroy; override;
function PutItem(NewItem: Pointer): boolean;
function GetItem: Pointer;
published
property Size: integer read FBufSize write SetSize;
end;
{ No constructor required because default values of 0, false etc acceptable }
implementation
const
FailMsg1 = 'Flow control failed, or buffer not initialised';
FailMsg2 = 'Critical section failed, or buffer not initialised';
procedure TBoundedBuffer.SetSize(NewSize: integer);
{ Initialises handles and allocates memory.
If the buffer size has previously been set, then this may invoke a buffer
reset }
begin
if FBufInit then ResetState;
if NewSize < 2 then NewSize := 2;
FBufSize := NewSize;
GetMem(FBuf, Sizeof(Pointer) * FBufSize);
FillMemory(FBuf, Sizeof(Pointer) * FBufSize, 0);
FBufInit := true;
FCriticalMutex := CreateMutex(nil, false, nil); { note lack of name }
{ The initial count on the semaphores requires some thought,
The maximum count requires more thought.
Again, all synchronisation objects are anonymous }
FEntriesFree := CreateSemaphore(nil, FBufSize - 1, FBufSize, nil);
FEntriesUsed := CreateSemaphore(nil, 0, FBufSize, nil);
if (FCriticalMutex = 0)
or (FEntriesFree = 0)
or (FEntriesUsed = 0) then ResetState
end;
procedure TBoundedBuffer.ResetState;
{ Closes handles and deallocates memory.
Note that this must unblock threads in such a manner that they quit cleanly }
begin
if FBufInit then
begin
WaitForSingleObject(FCriticalMutex, DefaultWaitTime);
FBufInit := false;
FBufSize := 0;
FreeMem(FBuf);
{ Now wake up all threads currently waiting.
Currently assumes only 1 producer and 1 consumer.
Plenty of ordering subtleties and pitfalls to be discussed here }
ReleaseSemaphore(FEntriesFree, 1, nil);
ReleaseSemaphore(FEntriesUsed, 1, nil);
CloseHandle(FEntriesFree);
CloseHandle(FEntriesUsed);
{ If reader or writer threads are waiting,
then they will be waiting on the mutex.
We will close the handle and let them time out }
CloseHandle(FCriticalMutex);
end;
end;
function TBoundedBuffer.PutItem(NewItem: Pointer): boolean;
{ Called by producer thread }
var
NthItem: TBufferEntries;
begin
result := false;
{ WAIT(EntriesFree) }
if WaitForSingleObject(FEntriesFree, INFINITE) <> WAIT_OBJECT_0 then
exit;
if (WaitForSingleObject(FCriticalMutex, DefaultWaitTime) <> WAIT_OBJECT_0)
or not FBufInit then
exit;
NthItem := FBuf;
Inc(NthItem, FWritePtr);
NthItem^ := NewItem;
FWritePtr := (FWritePtr + 1) mod FBufSize;
ReleaseMutex(FCriticalMutex);
{ SIGNAL(EntriesUsed) }
ReleaseSemaphore(FEntriesUsed, 1, nil);
result := true;
end;
function TBoundedBuffer.GetItem: Pointer;
{ Called by consumer thread }
var
NthItem: TBufferEntries;
begin
result := nil;
{ WAIT(EntriesUsed) }
if WaitForSingleObject(FEntriesUsed, INFINITE) <> WAIT_OBJECT_0 then
exit;
if (WaitForSingleObject(FCriticalMutex, DefaultWaitTime) <> WAIT_OBJECT_0)
or not FBufInit then
exit;
NthItem := FBuf;
Inc(NthItem, FReadPtr);
Result := NthItem^;
FReadPtr := (FReadPtr + 1) mod FBufSize;
ReleaseMutex(FCriticalMutex);
{ SIGNAL(EntriesFree) }
ReleaseSemaphore(FEntriesFree, 1, nil);
end;
destructor TBoundedBuffer.Destroy;
begin
ResetState;
inherited Destroy;
end;
end.