unit BoundedBuf; {Martin Harvey 24/4/2000} interface uses Windows, SysUtils; const DefaultWaitTime = 1000; { One second wait on all synchronisation primitives } type { I don't particularly like dynamic arrays, so I'm going to do things the "C" way here, explicitly allocating memory Think of TBufferEntries as ^(array of pointer) } TBufferEntries = ^Pointer; TBoundedBuffer = class private FBufInit: boolean; FBufSize: integer; FBuf: TBufferEntries; FReadPtr, { ReadPtr points to next used entry in buffer} FWritePtr: integer; { WritePtr points to next free entry in buffer} FEntriesFree, FEntriesUsed: THandle; { Flow control semaphores } FCriticalMutex: THandle; { Critical section mutex } FEntryCountFree, FEntryCountUsed: integer; { Used for peeking operations } protected procedure SetSize(NewSize: integer); function ControlledWait(Semaphore: THandle): boolean; { Returns whether wait returned OK, or an error occurred } public procedure ResetState; destructor Destroy; override; function PutItem(NewItem: Pointer): boolean; function GetItem: Pointer; { New peeking operations. Note that we can't use simple properties, since we have to communicate success or failure of the operation, in addition to providing a result } function GetEntriesFree(var Free: integer): boolean; function GetEntriesUsed(var Used: integer): boolean; published property Size: integer read FBufSize write SetSize; end; { No constructor required because default values of 0, false etc acceptable } implementation procedure TBoundedBuffer.SetSize(NewSize: integer); { Initialises handles and allocates memory. If the buffer size has previously been set, then this may invoke a buffer reset } begin if FBufInit then ResetState; if NewSize < 2 then NewSize := 2; FBufSize := NewSize; GetMem(FBuf, Sizeof(Pointer) * FBufSize); FillMemory(FBuf, Sizeof(Pointer) * FBufSize, 0); FCriticalMutex := CreateMutex(nil, false, nil); { note lack of name } WaitForSingleObject(FCriticalMutex, INFINITE); FBufInit := true; { The initial count on the semaphores requires some thought, The maximum count requires more thought. Again, all synchronisation objects are anonymous } FEntriesFree := CreateSemaphore(nil, FBufSize - 1, FBufSize, nil); FEntriesUsed := CreateSemaphore(nil, 0, FBufSize, nil); FEntryCountFree := FBufSize - 1; FEntryCountUsed := 0; ReleaseMutex(FCriticalMutex); if (FCriticalMutex = 0) or (FEntriesFree = 0) or (FEntriesUsed = 0) then ResetState end; procedure TBoundedBuffer.ResetState; { Closes handles and deallocates memory. Note that this must unblock threads in such a manner that they quit cleanly } begin if FBufInit then begin WaitForSingleObject(FCriticalMutex, DefaultWaitTime); FBufInit := false; FBufSize := 0; FreeMem(FBuf); ReleaseSemaphore(FEntriesUsed, 1, nil); ReleaseSemaphore(FEntriesFree, 1, nil); CloseHandle(FEntriesFree); CloseHandle(FEntriesUsed); ReleaseMutex(FCriticalMutex); CloseHandle(FCriticalMutex); end; end; function TBoundedBuffer.ControlledWait(Semaphore: THandle): boolean; var ErrCode: integer; begin repeat ErrCode := WaitForSingleObject(Semaphore, DefaultWaitTime); if (ErrCode = WAIT_OBJECT_0) or (ErrCode = WAIT_ABANDONED) then begin { If wait abandoned, return failure. Buffer not properly cleaned up } result := ErrCode = WAIT_OBJECT_0; exit; end; { Wait timed out. Check whether buffer state initialised } if WaitForSingleObject(FCriticalMutex, DefaultWaitTime) <> WAIT_OBJECT_0 then begin result := false; exit; end else begin result := FBufInit; ReleaseMutex(FCriticalMutex); end; until not Result; end; function TBoundedBuffer.PutItem(NewItem: Pointer): boolean; { Called by producer thread } var NthItem: TBufferEntries; begin result := false; { WAIT(EntriesFree) } if not ControlledWait(FEntriesFree) then exit; if (WaitForSingleObject(FCriticalMutex, DefaultWaitTime) <> WAIT_OBJECT_0) or not FBufInit then { NB.This condition depends on L -> R lazy evaluation } exit; NthItem := FBuf; Inc(NthItem, FWritePtr); NthItem^ := NewItem; FWritePtr := (FWritePtr + 1) mod FBufSize; Inc(FEntryCountUsed); Dec(FEntryCountFree); ReleaseMutex(FCriticalMutex); { SIGNAL(EntriesUsed) } ReleaseSemaphore(FEntriesUsed, 1, nil); result := true; end; function TBoundedBuffer.GetItem: Pointer; { Called by consumer thread } var NthItem: TBufferEntries; begin result := nil; { WAIT(EntriesUsed) } if not ControlledWait(FEntriesUsed) then exit; if (WaitForSingleObject(FCriticalMutex, DefaultWaitTime) <> WAIT_OBJECT_0) or not FBufInit then { NB.This condition depends on L -> R lazy evaluation } exit; NthItem := FBuf; Inc(NthItem, FReadPtr); Result := NthItem^; FReadPtr := (FReadPtr + 1) mod FBufSize; Inc(FEntryCountFree); Dec(FEntryCountUsed); ReleaseMutex(FCriticalMutex); { SIGNAL(EntriesFree) } ReleaseSemaphore(FEntriesFree, 1, nil); end; destructor TBoundedBuffer.Destroy; begin ResetState; inherited Destroy; end; function TBoundedBuffer.GetEntriesFree(var Free: integer): boolean; begin result := false; if (WaitForSingleObject(FCriticalMutex, DefaultWaitTime) <> WAIT_OBJECT_0) or not FBufInit then exit; Free := FEntryCountFree; result := true; ReleaseMutex(FCriticalMutex); end; function TBoundedBuffer.GetEntriesUsed(var Used: integer): boolean; begin result := false; if (WaitForSingleObject(FCriticalMutex, DefaultWaitTime) <> WAIT_OBJECT_0) or not FBufInit then exit; Used := FEntryCountUsed; result := true; ReleaseMutex(FCriticalMutex); end; end.