unit BlockToAsyncBuf; { Martin Harvey 10/5/2000 } interface uses Classes, Forms, Messages, Windows, BiDirBuf; const InternalBufferSize = 4; WM_BLOCK_ASYNC = WM_USER + 2876; type { With this component, as with previous buffering schemes, one cannot read or write nil pointers. } TThreadNotify = (tnReaderDataFlow, tnWriterDataFlow); TBlockAsyncThread = class(TThread) private FDataSection: TRTLCriticalSection; FIdleSemaphore: THandle; FInterimBuf: Pointer; FOnDataFlow: TNotifyEvent; FBuffer: TBiDirBuf; protected procedure DataFlow; virtual; function GetItemsInTransit: integer; public constructor Create(CreateSuspended: boolean); destructor Destroy; override; published property OnDataFlow: TNotifyEvent read FOnDataFlow write FOnDataFlow; property Buffer: TBiDirBuf write FBuffer; property ItemsInTransit: integer read GetItemsInTransit; end; TBAWriterThread = class(TBlockAsyncThread) private protected procedure Execute; override; public function WriteItem(Item: Pointer): boolean; published end; TBAReaderThread = class(TBlockAsyncThread) private protected procedure Execute; override; public function ReadItem: pointer; published end; TBlockToAsyncBuf = class(TComponent) private FHWND: THandle; FBuffer: TBiDirBuf; FReaderThread: TBAReaderThread; FWriterThread: TBAWriterThread; FOnRead, FOnWrite: TNotifyEvent; protected procedure MessageHandler(var Msg: TMessage); procedure ReaderDataFlow(Sender: TObject); procedure WriterDataFlow(Sender: TObject); procedure Read; virtual; procedure Write; virtual; function GetItemsInTransit: integer; public constructor Create(AOwner: TComponent); override; destructor Destroy; override; function BlockingRead: pointer; function BlockingWrite(Item: pointer): boolean; function AsyncRead: pointer; function AsyncWrite(Item: pointer): boolean; procedure ResetState; published property OnRead: TNotifyEvent read FOnRead write FOnRead; property OnWrite: TNotifyEvent read FOnWrite write FOnWrite; property ItemsInTransit: integer read GetItemsInTransit; end; implementation procedure TBlockAsyncThread.DataFlow; begin if Assigned(FOnDataFlow) then FOnDataFlow(Self); end; constructor TBlockAsyncThread.Create(CreateSuspended: boolean); begin inherited Create(CreateSuspended); InitializeCriticalSection(FDataSection); FIdleSemaphore := CreateSemaphore(nil, 0, High(Integer), nil); end; destructor TBlockAsyncThread.Destroy; begin ReleaseSemaphore(FIdleSemaphore, 1, nil); WaitFor; DeleteCriticalSection(FDataSection); CloseHandle(FIdleSemaphore); end; function TBlockAsyncThread.GetItemsInTransit: integer; begin EnterCriticalSection(FDataSection); if Assigned(FInterimBuf) then result := 1 else result := 0; LeaveCriticalSection(FDataSection); end; { Buffer error handling needs to be discussed } procedure TBAWriterThread.Execute; var Temp: Pointer; begin while not Terminated do begin DataFlow; WaitForSingleObject(FIdleSemaphore, INFINITE); EnterCriticalSection(FDataSection); Temp := FInterimBuf; FInterimBuf := nil; LeaveCriticalSection(FDataSection); if not FBuffer.PutItem(bsSideA, Temp) then Terminate; end; end; function TBAWriterThread.WriteItem(Item: Pointer): boolean; begin result := false; EnterCriticalSection(FDataSection); if not Assigned(FInterimBuf) then begin FInterimBuf := Item; result := true; end; LeaveCriticalSection(FDataSection); if Result then ReleaseSemaphore(FIdleSemaphore, 1, nil); end; procedure TBAReaderThread.Execute; var Temp: Pointer; begin while not Terminated do begin Temp := FBuffer.GetItem(bsSideA); if Assigned(Temp) then begin EnterCriticalSection(FDataSection); FInterimBuf := Temp; LeaveCriticalSection(FDataSection); DataFlow; WaitForSingleObject(FIdleSemaphore, INFINITE); end else Terminate; end; end; function TBAReaderThread.ReadItem: pointer; begin EnterCriticalSection(FDataSection); result := FInterimBuf; LeaveCriticalSection(FDataSection); if Assigned(Result) then ReleaseSemaphore(FIdleSemaphore, 1, nil); end; procedure TBlockToAsyncBuf.MessageHandler(var Msg: TMessage); begin if (Msg.Msg = WM_BLOCK_ASYNC) then begin case TThreadNotify(Msg.LParam) of tnReaderDataflow: Read; tnWriterDataflow: Write; else Assert(false); end; end; end; procedure TBlockToAsyncBuf.ReaderDataFlow(Sender: TObject); begin PostMessage(FHWND, WM_BLOCK_ASYNC, 0, Integer(tnReaderDataflow)); end; procedure TBlockToAsyncBuf.WriterDataFlow(Sender: TObject); begin PostMessage(FHWND, WM_BLOCK_ASYNC, 0, Integer(tnWriterDataflow)); end; procedure TBlockToAsyncBuf.Read; begin if Assigned(FOnRead) then FOnRead(Self); end; procedure TBlockToAsyncBuf.Write; begin if Assigned(FOnWrite) then FOnWrite(Self); end; constructor TBlockToAsyncBuf.Create(AOwner: TComponent); begin inherited Create(AOwner); FHWND := AllocateHWnd(MessageHandler); FBuffer := TBiDirBuf.Create; FBuffer.Size := InternalBufferSize; FReaderThread := TBAReaderThread.Create(true); FReaderThread.Buffer := Self.FBuffer; FReaderThread.OnDataFlow := ReaderDataFlow; FWriterThread := TBAWriterThread.Create(true); FWriterThread.Buffer := Self.FBuffer; FWriterThread.OnDataFlow := WriterDataFlow; FReaderThread.Resume; FWriterThread.Resume; end; procedure TBlockToAsyncBuf.ResetState; begin if Assigned(FReaderThread) then FReaderThread.Terminate; if Assigned(FWriterThread) then FWriterThread.Terminate; FBuffer.ResetState; FReaderThread.Free; FWriterThread.Free; FReaderThread := nil; FWriterThread := nil; end; destructor TBlockToAsyncBuf.Destroy; begin { A few destruction subtleties here } ResetState; FBuffer.Free; DeallocateHWnd(FHWND); inherited Destroy; end; function TBlockToAsyncBuf.BlockingRead: pointer; begin result := FBuffer.GetItem(bsSideB); end; function TBlockToAsyncBuf.BlockingWrite(Item: pointer): boolean; begin result := FBuffer.PutItem(bsSideB, Item); end; function TBlockToAsyncBuf.AsyncRead: pointer; begin result := FReaderThread.ReadItem; end; function TBlockToAsyncBuf.AsyncWrite(Item: pointer): boolean; begin result := FWriterThread.WriteItem(Item); end; function TBlockToAsyncBuf.GetItemsInTransit: integer; var Entries: integer; begin result := FReaderThread.ItemsInTransit + FWriterThread.ItemsInTransit; if FBuffer.GetEntriesUsed(bsSideA, Entries) then Inc(result, Entries); if FBuffer.GetEntriesUsed(bsSideB, Entries) then Inc(result, Entries); end; end.