unit BoundedBuf;

{Martin Harvey 24/4/2000}

interface

uses Windows, SysUtils;

const
  DefaultWaitTime = 1000; { One second wait on all synchronisation primitives }

type
  { I don't particularly like dynamic arrays, so I'm going to do things
    the "C" way here, explicitly allocating memory
    Think of TBufferEntries as ^(array of pointer) }

  TBufferEntries = ^Pointer;

  TBoundedBuffer = class
  private
    FBufInit: boolean;
    FBufSize: integer;
    FBuf: TBufferEntries;
    FReadPtr, { ReadPtr points to next used entry in buffer}
    FWritePtr: integer; { WritePtr points to next free entry in buffer}
    FEntriesFree, FEntriesUsed: THandle; { Flow control semaphores }
    FCriticalMutex: THandle; { Critical section mutex }
    FEntryCountFree, FEntryCountUsed: integer; { Used for peeking operations }
  protected
    procedure SetSize(NewSize: integer);
    function ControlledWait(Semaphore: THandle): boolean;
    { Returns whether wait returned OK, or an error occurred }
  public
    procedure ResetState;
    destructor Destroy; override;
    function PutItem(NewItem: Pointer): boolean;
    function GetItem: Pointer;
    { New peeking operations. Note that we can't use simple properties, since
      we have to communicate success or failure of the operation, in addition
      to providing a result }
    function GetEntriesFree(var Free: integer): boolean;
    function GetEntriesUsed(var Used: integer): boolean;
  published
    property Size: integer read FBufSize write SetSize;
  end;

  { No constructor required because default values of 0, false etc acceptable }

implementation

procedure TBoundedBuffer.SetSize(NewSize: integer);

{ Initialises handles and allocates memory.
  If the buffer size has previously been set, then this may invoke a buffer
  reset }

begin
  if FBufInit then ResetState;
  if NewSize < 2 then NewSize := 2;
  FBufSize := NewSize;
  GetMem(FBuf, Sizeof(Pointer) * FBufSize);
  FillMemory(FBuf, Sizeof(Pointer) * FBufSize, 0);
  FCriticalMutex := CreateMutex(nil, false, nil); { note lack of name }
  WaitForSingleObject(FCriticalMutex, INFINITE);
  FBufInit := true;
  { The initial count on the semaphores requires some thought,
    The maximum count requires more thought.
    Again, all synchronisation objects are anonymous }
  FEntriesFree := CreateSemaphore(nil, FBufSize - 1, FBufSize, nil);
  FEntriesUsed := CreateSemaphore(nil, 0, FBufSize, nil);
  FEntryCountFree := FBufSize - 1;
  FEntryCountUsed := 0;
  ReleaseMutex(FCriticalMutex);
  if (FCriticalMutex = 0)
    or (FEntriesFree = 0)
    or (FEntriesUsed = 0) then ResetState
end;

procedure TBoundedBuffer.ResetState;

{ Closes handles and deallocates memory.
  Note that this must unblock threads in such a manner that they quit cleanly }

begin
  if FBufInit then
  begin
    WaitForSingleObject(FCriticalMutex, DefaultWaitTime);
    FBufInit := false;
    FBufSize := 0;
    FreeMem(FBuf);
    ReleaseSemaphore(FEntriesUsed, 1, nil);
    ReleaseSemaphore(FEntriesFree, 1, nil);
    CloseHandle(FEntriesFree);
    CloseHandle(FEntriesUsed);
    ReleaseMutex(FCriticalMutex);
    CloseHandle(FCriticalMutex);
  end;
end;

function TBoundedBuffer.ControlledWait(Semaphore: THandle): boolean;

var
  ErrCode: integer;

begin
  repeat
    ErrCode := WaitForSingleObject(Semaphore, DefaultWaitTime);
    if (ErrCode = WAIT_OBJECT_0) or (ErrCode = WAIT_ABANDONED) then
    begin
      { If wait abandoned, return failure. Buffer not properly cleaned up }
      result := ErrCode = WAIT_OBJECT_0;
      exit;
    end;
    { Wait timed out. Check whether buffer state initialised }
    if WaitForSingleObject(FCriticalMutex, DefaultWaitTime) <> WAIT_OBJECT_0 then
    begin
      result := false;
      exit;
    end
    else
    begin
      result := FBufInit;
      ReleaseMutex(FCriticalMutex);
    end;
  until not Result;
end;

function TBoundedBuffer.PutItem(NewItem: Pointer): boolean;

{ Called by producer thread }
var
  NthItem: TBufferEntries;

begin
  result := false;
  { WAIT(EntriesFree) }
  if not ControlledWait(FEntriesFree) then
    exit;
  if (WaitForSingleObject(FCriticalMutex, DefaultWaitTime) <> WAIT_OBJECT_0)
    or not FBufInit then { NB.This condition depends on L -> R lazy evaluation }
    exit;
  NthItem := FBuf;
  Inc(NthItem, FWritePtr);
  NthItem^ := NewItem;
  FWritePtr := (FWritePtr + 1) mod FBufSize;
  Inc(FEntryCountUsed);
  Dec(FEntryCountFree);
  ReleaseMutex(FCriticalMutex);
  { SIGNAL(EntriesUsed) }
  ReleaseSemaphore(FEntriesUsed, 1, nil);
  result := true;
end;

function TBoundedBuffer.GetItem: Pointer;

{ Called by consumer thread }
var
  NthItem: TBufferEntries;

begin
  result := nil;
  { WAIT(EntriesUsed) }
  if not ControlledWait(FEntriesUsed) then
    exit;
  if (WaitForSingleObject(FCriticalMutex, DefaultWaitTime) <> WAIT_OBJECT_0)
    or not FBufInit then { NB.This condition depends on L -> R lazy evaluation }
    exit;
  NthItem := FBuf;
  Inc(NthItem, FReadPtr);
  Result := NthItem^;
  FReadPtr := (FReadPtr + 1) mod FBufSize;
  Inc(FEntryCountFree);
  Dec(FEntryCountUsed);
  ReleaseMutex(FCriticalMutex);
  { SIGNAL(EntriesFree) }
  ReleaseSemaphore(FEntriesFree, 1, nil);
end;

destructor TBoundedBuffer.Destroy;
begin
  ResetState;
  inherited Destroy;
end;

function TBoundedBuffer.GetEntriesFree(var Free: integer): boolean;
begin
  result := false;
  if (WaitForSingleObject(FCriticalMutex, DefaultWaitTime) <> WAIT_OBJECT_0)
    or not FBufInit then
    exit;
  Free := FEntryCountFree;
  result := true;
  ReleaseMutex(FCriticalMutex);
end;

function TBoundedBuffer.GetEntriesUsed(var Used: integer): boolean;
begin
  result := false;
  if (WaitForSingleObject(FCriticalMutex, DefaultWaitTime) <> WAIT_OBJECT_0)
    or not FBufInit then
    exit;
  Used := FEntryCountUsed;
  result := true;
  ReleaseMutex(FCriticalMutex);
end;

end.