unit BlockToAsyncBuf;
{ Martin Harvey 10/5/2000 }
interface
uses Classes, Forms, Messages, Windows, BiDirBuf;
const
InternalBufferSize = 4;
WM_BLOCK_ASYNC = WM_USER + 2876;
type
{ With this component, as with previous buffering schemes, one cannot read
or write nil pointers. }
TThreadNotify = (tnReaderDataFlow, tnWriterDataFlow);
TBlockAsyncThread = class(TThread)
private
FDataSection: TRTLCriticalSection;
FIdleSemaphore: THandle;
FInterimBuf: Pointer;
FOnDataFlow: TNotifyEvent;
FBuffer: TBiDirBuf;
protected
procedure DataFlow; virtual;
function GetItemsInTransit: integer;
public
constructor Create(CreateSuspended: boolean);
destructor Destroy; override;
published
property OnDataFlow: TNotifyEvent read FOnDataFlow write FOnDataFlow;
property Buffer: TBiDirBuf write FBuffer;
property ItemsInTransit: integer read GetItemsInTransit;
end;
TBAWriterThread = class(TBlockAsyncThread)
private
protected
procedure Execute; override;
public
function WriteItem(Item: Pointer): boolean;
published
end;
TBAReaderThread = class(TBlockAsyncThread)
private
protected
procedure Execute; override;
public
function ReadItem: pointer;
published
end;
TBlockToAsyncBuf = class(TComponent)
private
FHWND: THandle;
FBuffer: TBiDirBuf;
FReaderThread: TBAReaderThread;
FWriterThread: TBAWriterThread;
FOnRead, FOnWrite: TNotifyEvent;
protected
procedure MessageHandler(var Msg: TMessage);
procedure ReaderDataFlow(Sender: TObject);
procedure WriterDataFlow(Sender: TObject);
procedure Read; virtual;
procedure Write; virtual;
function GetItemsInTransit: integer;
public
constructor Create(AOwner: TComponent); override;
destructor Destroy; override;
function BlockingRead: pointer;
function BlockingWrite(Item: pointer): boolean;
function AsyncRead: pointer;
function AsyncWrite(Item: pointer): boolean;
procedure ResetState;
published
property OnRead: TNotifyEvent read FOnRead write FOnRead;
property OnWrite: TNotifyEvent read FOnWrite write FOnWrite;
property ItemsInTransit: integer read GetItemsInTransit;
end;
implementation
procedure TBlockAsyncThread.DataFlow;
begin
if Assigned(FOnDataFlow) then FOnDataFlow(Self);
end;
constructor TBlockAsyncThread.Create(CreateSuspended: boolean);
begin
inherited Create(CreateSuspended);
InitializeCriticalSection(FDataSection);
FIdleSemaphore := CreateSemaphore(nil, 0, High(Integer), nil);
end;
destructor TBlockAsyncThread.Destroy;
begin
ReleaseSemaphore(FIdleSemaphore, 1, nil);
WaitFor;
DeleteCriticalSection(FDataSection);
CloseHandle(FIdleSemaphore);
end;
function TBlockAsyncThread.GetItemsInTransit: integer;
begin
EnterCriticalSection(FDataSection);
if Assigned(FInterimBuf) then
result := 1
else
result := 0;
LeaveCriticalSection(FDataSection);
end;
{ Buffer error handling needs to be discussed }
procedure TBAWriterThread.Execute;
var
Temp: Pointer;
begin
while not Terminated do
begin
DataFlow;
WaitForSingleObject(FIdleSemaphore, INFINITE);
EnterCriticalSection(FDataSection);
Temp := FInterimBuf;
FInterimBuf := nil;
LeaveCriticalSection(FDataSection);
if not FBuffer.PutItem(bsSideA, Temp) then Terminate;
end;
end;
function TBAWriterThread.WriteItem(Item: Pointer): boolean;
begin
result := false;
EnterCriticalSection(FDataSection);
if not Assigned(FInterimBuf) then
begin
FInterimBuf := Item;
result := true;
end;
LeaveCriticalSection(FDataSection);
if Result then ReleaseSemaphore(FIdleSemaphore, 1, nil);
end;
procedure TBAReaderThread.Execute;
var
Temp: Pointer;
begin
while not Terminated do
begin
Temp := FBuffer.GetItem(bsSideA);
if Assigned(Temp) then
begin
EnterCriticalSection(FDataSection);
FInterimBuf := Temp;
LeaveCriticalSection(FDataSection);
DataFlow;
WaitForSingleObject(FIdleSemaphore, INFINITE);
end
else Terminate;
end;
end;
function TBAReaderThread.ReadItem: pointer;
begin
EnterCriticalSection(FDataSection);
result := FInterimBuf;
LeaveCriticalSection(FDataSection);
if Assigned(Result) then ReleaseSemaphore(FIdleSemaphore, 1, nil);
end;
procedure TBlockToAsyncBuf.MessageHandler(var Msg: TMessage);
begin
if (Msg.Msg = WM_BLOCK_ASYNC) then
begin
case TThreadNotify(Msg.LParam) of
tnReaderDataflow: Read;
tnWriterDataflow: Write;
else
Assert(false);
end;
end;
end;
procedure TBlockToAsyncBuf.ReaderDataFlow(Sender: TObject);
begin
PostMessage(FHWND, WM_BLOCK_ASYNC, 0, Integer(tnReaderDataflow));
end;
procedure TBlockToAsyncBuf.WriterDataFlow(Sender: TObject);
begin
PostMessage(FHWND, WM_BLOCK_ASYNC, 0, Integer(tnWriterDataflow));
end;
procedure TBlockToAsyncBuf.Read;
begin
if Assigned(FOnRead) then FOnRead(Self);
end;
procedure TBlockToAsyncBuf.Write;
begin
if Assigned(FOnWrite) then FOnWrite(Self);
end;
constructor TBlockToAsyncBuf.Create(AOwner: TComponent);
begin
inherited Create(AOwner);
FHWND := AllocateHWnd(MessageHandler);
FBuffer := TBiDirBuf.Create;
FBuffer.Size := InternalBufferSize;
FReaderThread := TBAReaderThread.Create(true);
FReaderThread.Buffer := Self.FBuffer;
FReaderThread.OnDataFlow := ReaderDataFlow;
FWriterThread := TBAWriterThread.Create(true);
FWriterThread.Buffer := Self.FBuffer;
FWriterThread.OnDataFlow := WriterDataFlow;
FReaderThread.Resume;
FWriterThread.Resume;
end;
procedure TBlockToAsyncBuf.ResetState;
begin
if Assigned(FReaderThread) then FReaderThread.Terminate;
if Assigned(FWriterThread) then FWriterThread.Terminate;
FBuffer.ResetState;
FReaderThread.Free;
FWriterThread.Free;
FReaderThread := nil;
FWriterThread := nil;
end;
destructor TBlockToAsyncBuf.Destroy;
begin
{ A few destruction subtleties here }
ResetState;
FBuffer.Free;
DeallocateHWnd(FHWND);
inherited Destroy;
end;
function TBlockToAsyncBuf.BlockingRead: pointer;
begin
result := FBuffer.GetItem(bsSideB);
end;
function TBlockToAsyncBuf.BlockingWrite(Item: pointer): boolean;
begin
result := FBuffer.PutItem(bsSideB, Item);
end;
function TBlockToAsyncBuf.AsyncRead: pointer;
begin
result := FReaderThread.ReadItem;
end;
function TBlockToAsyncBuf.AsyncWrite(Item: pointer): boolean;
begin
result := FWriterThread.WriteItem(Item);
end;
function TBlockToAsyncBuf.GetItemsInTransit: integer;
var
Entries: integer;
begin
result := FReaderThread.ItemsInTransit + FWriterThread.ItemsInTransit;
if FBuffer.GetEntriesUsed(bsSideA, Entries) then
Inc(result, Entries);
if FBuffer.GetEntriesUsed(bsSideB, Entries) then
Inc(result, Entries);
end;
end.