unit BlockToAsyncBuf;

{ Martin Harvey 10/5/2000 }

interface

uses Classes, Forms, Messages, Windows, BiDirBuf;

const
  InternalBufferSize = 4;
  WM_BLOCK_ASYNC = WM_USER + 2876;

type

  { With this component, as with previous buffering schemes, one cannot read
    or write nil pointers. }

  TThreadNotify = (tnReaderDataFlow, tnWriterDataFlow);

  TBlockAsyncThread = class(TThread)
  private
    FDataSection: TRTLCriticalSection;
    FIdleSemaphore: THandle;
    FInterimBuf: Pointer;
    FOnDataFlow: TNotifyEvent;
    FBuffer: TBiDirBuf;
  protected
    procedure DataFlow; virtual;
    function GetItemsInTransit: integer;
  public
    constructor Create(CreateSuspended: boolean);
    destructor Destroy; override;
  published
    property OnDataFlow: TNotifyEvent read FOnDataFlow write FOnDataFlow;
    property Buffer: TBiDirBuf write FBuffer;
    property ItemsInTransit: integer read GetItemsInTransit;
  end;

  TBAWriterThread = class(TBlockAsyncThread)
  private
  protected
    procedure Execute; override;
  public
    function WriteItem(Item: Pointer): boolean;
  published
  end;

  TBAReaderThread = class(TBlockAsyncThread)
  private
  protected
    procedure Execute; override;
  public
    function ReadItem: pointer;
  published
  end;

  TBlockToAsyncBuf = class(TComponent)
  private
    FHWND: THandle;
    FBuffer: TBiDirBuf;
    FReaderThread: TBAReaderThread;
    FWriterThread: TBAWriterThread;
    FOnRead, FOnWrite: TNotifyEvent;
  protected
    procedure MessageHandler(var Msg: TMessage);
    procedure ReaderDataFlow(Sender: TObject);
    procedure WriterDataFlow(Sender: TObject);
    procedure Read; virtual;
    procedure Write; virtual;
    function GetItemsInTransit: integer;
  public
    constructor Create(AOwner: TComponent); override;
    destructor Destroy; override;
    function BlockingRead: pointer;
    function BlockingWrite(Item: pointer): boolean;
    function AsyncRead: pointer;
    function AsyncWrite(Item: pointer): boolean;
    procedure ResetState;
  published
    property OnRead: TNotifyEvent read FOnRead write FOnRead;
    property OnWrite: TNotifyEvent read FOnWrite write FOnWrite;
    property ItemsInTransit: integer read GetItemsInTransit;
  end;

implementation

procedure TBlockAsyncThread.DataFlow;
begin
  if Assigned(FOnDataFlow) then FOnDataFlow(Self);
end;

constructor TBlockAsyncThread.Create(CreateSuspended: boolean);
begin
  inherited Create(CreateSuspended);
  InitializeCriticalSection(FDataSection);
  FIdleSemaphore := CreateSemaphore(nil, 0, High(Integer), nil);
end;

destructor TBlockAsyncThread.Destroy;
begin
  ReleaseSemaphore(FIdleSemaphore, 1, nil);
  WaitFor;
  DeleteCriticalSection(FDataSection);
  CloseHandle(FIdleSemaphore);
end;

function TBlockAsyncThread.GetItemsInTransit: integer;
begin
  EnterCriticalSection(FDataSection);
  if Assigned(FInterimBuf) then
    result := 1
  else
    result := 0;
  LeaveCriticalSection(FDataSection);
end;

{ Buffer error handling needs to be discussed }

procedure TBAWriterThread.Execute;

var
  Temp: Pointer;

begin
  while not Terminated do
  begin
    DataFlow;
    WaitForSingleObject(FIdleSemaphore, INFINITE);
    EnterCriticalSection(FDataSection);
    Temp := FInterimBuf;
    FInterimBuf := nil;
    LeaveCriticalSection(FDataSection);
    if not FBuffer.PutItem(bsSideA, Temp) then Terminate;
  end;
end;

function TBAWriterThread.WriteItem(Item: Pointer): boolean;
begin
  result := false;
  EnterCriticalSection(FDataSection);
  if not Assigned(FInterimBuf) then
  begin
    FInterimBuf := Item;
    result := true;
  end;
  LeaveCriticalSection(FDataSection);
  if Result then ReleaseSemaphore(FIdleSemaphore, 1, nil);
end;

procedure TBAReaderThread.Execute;

var
  Temp: Pointer;

begin
  while not Terminated do
  begin
    Temp := FBuffer.GetItem(bsSideA);
    if Assigned(Temp) then
    begin
      EnterCriticalSection(FDataSection);
      FInterimBuf := Temp;
      LeaveCriticalSection(FDataSection);
      DataFlow;
      WaitForSingleObject(FIdleSemaphore, INFINITE);
    end
    else Terminate;
  end;
end;

function TBAReaderThread.ReadItem: pointer;
begin
  EnterCriticalSection(FDataSection);
  result := FInterimBuf;
  LeaveCriticalSection(FDataSection);
  if Assigned(Result) then ReleaseSemaphore(FIdleSemaphore, 1, nil);
end;

procedure TBlockToAsyncBuf.MessageHandler(var Msg: TMessage);
begin
  if (Msg.Msg = WM_BLOCK_ASYNC) then
  begin
    case TThreadNotify(Msg.LParam) of
      tnReaderDataflow: Read;
      tnWriterDataflow: Write;
    else
      Assert(false);
    end;
  end;
end;

procedure TBlockToAsyncBuf.ReaderDataFlow(Sender: TObject);
begin
  PostMessage(FHWND, WM_BLOCK_ASYNC, 0, Integer(tnReaderDataflow));
end;

procedure TBlockToAsyncBuf.WriterDataFlow(Sender: TObject);
begin
  PostMessage(FHWND, WM_BLOCK_ASYNC, 0, Integer(tnWriterDataflow));
end;

procedure TBlockToAsyncBuf.Read;
begin
  if Assigned(FOnRead) then FOnRead(Self);
end;

procedure TBlockToAsyncBuf.Write;
begin
  if Assigned(FOnWrite) then FOnWrite(Self);
end;

constructor TBlockToAsyncBuf.Create(AOwner: TComponent);
begin
  inherited Create(AOwner);
  FHWND := AllocateHWnd(MessageHandler);
  FBuffer := TBiDirBuf.Create;
  FBuffer.Size := InternalBufferSize;
  FReaderThread := TBAReaderThread.Create(true);
  FReaderThread.Buffer := Self.FBuffer;
  FReaderThread.OnDataFlow := ReaderDataFlow;
  FWriterThread := TBAWriterThread.Create(true);
  FWriterThread.Buffer := Self.FBuffer;
  FWriterThread.OnDataFlow := WriterDataFlow;
  FReaderThread.Resume;
  FWriterThread.Resume;
end;

procedure TBlockToAsyncBuf.ResetState;
begin
  if Assigned(FReaderThread) then FReaderThread.Terminate;
  if Assigned(FWriterThread) then FWriterThread.Terminate;
  FBuffer.ResetState;
  FReaderThread.Free;
  FWriterThread.Free;
  FReaderThread := nil;
  FWriterThread := nil;
end;

destructor TBlockToAsyncBuf.Destroy;
begin
  { A few destruction subtleties here }
  ResetState;
  FBuffer.Free;
  DeallocateHWnd(FHWND);
  inherited Destroy;
end;

function TBlockToAsyncBuf.BlockingRead: pointer;
begin
  result := FBuffer.GetItem(bsSideB);
end;

function TBlockToAsyncBuf.BlockingWrite(Item: pointer): boolean;
begin
  result := FBuffer.PutItem(bsSideB, Item);
end;

function TBlockToAsyncBuf.AsyncRead: pointer;
begin
  result := FReaderThread.ReadItem;
end;

function TBlockToAsyncBuf.AsyncWrite(Item: pointer): boolean;
begin
  result := FWriterThread.WriteItem(Item);
end;

function TBlockToAsyncBuf.GetItemsInTransit: integer;

var
  Entries: integer;

begin
  result := FReaderThread.ItemsInTransit + FWriterThread.ItemsInTransit;
  if FBuffer.GetEntriesUsed(bsSideA, Entries) then
    Inc(result, Entries);
  if FBuffer.GetEntriesUsed(bsSideB, Entries) then
    Inc(result, Entries);
end;

end.