{ 10-05-1999 10:37:03 PM > [martin on MARTIN] checked out /Reformatting
   according to Delphi guidelines. }
{ 06-04-1999 7:49:40 PM > [martin on MARTIN] checked out /Modifying Class
   Names }
unit MCHPipeThreads;

{Martin Harvey 7/11/98}

{This unit gives us a base pipe thread type with some common support for
error tracking}

interface

uses Classes,MCHPipeTypes,Windows,MCHMemoryStream;

type
  TMCHPipeThread = class(TThread)
  private
    FOnTerminate:TNotifyEvent;
  protected
    FTermReason:TMCHError;
  public
    procedure Execute;override;
  published
    property TermReason:TMCHError read FTermReason;
    property OnTerminate:TNotifyEvent read FOnTerminate write FOnTerminate;
  end;

  TMCHPipeWriterThread = class(TMCHPipeThread)
  private
    FDataMutex,FIdleSemaphore:THandle;
    FPipeWriteHandle:TMCHHandle;
    FData:TMCHMemoryStream;
    FWriteIdx:integer;
  protected
  public
    constructor Create(CreateSuspended:boolean);
    procedure Execute;override;
    destructor Destroy;override;
    function WriteData(InStream:TStream):integer; {returns bytes written = InStream.Size}
    property PipeWriteHandle:TMCHHandle read FPipeWriteHandle write FPipeWriteHandle;
  end;

  TMCHPipeReaderThread = class(TMCHPipeThread)
  private
      { Private declarations }
    FDataMutex:THandle;
    FPipeReadHandle:TMCHHandle;
    FData:TMCHMemoryStream;
    FOnDataRecieved:TNotifyEvent;
    FOnConnect:TNotifyEvent;
  protected
  public
    constructor Create(CreateSuspended:boolean);
    procedure Execute;override;
    destructor Destroy;override;
    function ReadData(OutStream:TStream):integer; {returns bytes read}
    property OnDataRecieved:TNotifyEvent read FOnDataRecieved write FOnDataRecieved;
    property PipeReadHandle:TMCHHandle read FPipeReadHandle write FPipeReadHandle;
    property OnConnect:TNotifyEvent read FOnConnect write FOnConnect;
  end;

implementation

uses MCHPipeInterface2;

const
  BufSize = 4096;

type
  DataBuf = array[0..BufSize - 1] of integer;

procedure TMCHPipeThread.Execute;
begin
  if Assigned(FOnTerminate) then FOnTerminate(Self);
end;

constructor TMCHPipeReaderThread.Create(CreateSuspended:boolean);
begin
  inherited Create(CreateSuspended);
  FDataMutex := CreateMutex(nil,false,nil);
  FData := TMCHMemoryStream.Create;
end;

destructor TMCHPipeReaderThread.Destroy;
begin
  Terminate;
  if Suspended then Resume;
  WaitFor;
  FData.Free;
  CloseHandle(FDataMutex);
  inherited Destroy;
end;

function TMCHPipeReaderThread.ReadData(OutStream:TStream):integer;

begin
  WaitForSingleObject(FDataMutex,INFINITE);
  try
    OutStream.Seek(0,soFromEnd);
    FData.Seek(0,soFromBeginning);
    Result := FData.Size;
    OutStream.CopyFrom(FData,FData.Size);
    FData.Clear;
  finally
    ReleaseMutex(FDataMutex);
  end;
end;

procedure TMCHPipeReaderThread.Execute;

var
  Buffer:DataBuf;
  BytesToRead,BytesThisTime:integer;

begin
  FTermReason := WaitForPeer(FPipeReadHandle);
  if FTermReason <> meOK then
    terminate
  else
    if Assigned(FOnConnect) then FOnConnect(Self);
  while not terminated do
  begin
    FTermReason := PeekData(FPipeReadHandle,BytesToRead);
    if FTermReason <> meOK then
      terminate;
    if (not terminated) then
    begin
      if BytesToRead <= 0 then
      begin
        {Callback handler should implement lazy async notification}
        if Assigned(FOnDataRecieved) then FOnDataRecieved(Self);
        BytesToRead := 1;
      end;
      if BytesToRead > BufSize then
        BytesThisTime := BufSize
      else
        BytesThisTime := BytesToRead;
      FTermReason := MCHPipeInterface2.ReadData(FPipeReadHandle,Buffer,BytesThisTime);
      if FTermReason <> meOK then
        terminate
      else
      begin
        WaitForSingleObject(FDataMutex,INFINITE);
        FData.Seek(0,soFromEnd);
        FData.WriteBuffer(Buffer,BytesThisTime);
        ReleaseMutex(FDataMutex);
      end;
    end;
  end;
  inherited Execute;
end;

constructor TMCHPipeWriterThread.Create(CreateSuspended:boolean);
begin
  inherited Create(CreateSuspended);
  FDataMutex := CreateMutex(nil,false,nil);
  FIdleSemaphore := CreateSemaphore(nil,0,High(Integer),nil);
  FData := TMCHMemoryStream.Create;
end;

destructor TMCHPipeWriterThread.Destroy;
begin
  Terminate;
  ReleaseSemaphore(FIdleSemaphore,1,nil);
  if Suspended then Resume;
  WaitFor;
  FData.Free;
  CloseHandle(FDataMutex);
  inherited Destroy;
end;

function TMCHPipeWriterThread.WriteData(InStream:TStream):integer;

begin
  InStream.Seek(0,soFromBeginning);
  WaitForSingleObject(FDataMutex,INFINITE);
  try
    Result := InStream.Size;
    FData.Seek(0,soFromEnd);
    FData.CopyFrom(InStream,InStream.Size);
  finally
    ReleaseMutex(FDataMutex);
  end;
  ReleaseSemaphore(FIdleSemaphore,1,nil);
end;

procedure TMCHPipeWriterThread.Execute;

var
  Buf:DataBuf;
  BytesThisTime,BytesToWrite:integer;

begin
  while not (terminated) do
  begin
    WaitForSingleObject(FDataMutex,INFINITE);
    BytesToWrite := FData.Size - FWriteIdx;
    ReleaseMutex(FDataMutex);
    while (BytesToWrite > 0) and (not terminated) do
    begin
      if BytesToWrite > BufSize then
        BytesThisTime := BufSize
      else
        BytesThisTime := BytesToWrite;
      WaitForSingleObject(FDataMutex,INFINITE);
      FData.Seek(FWriteIdx,soFromBeginning);
      FData.ReadBuffer(Buf,BytesThisTime);
      ReleaseMutex(FDataMutex);
      {Note that we should not block when we have the mutex!}
      FTermReason := MCHPipeInterface2.WriteData(FPipeWriteHandle,Buf,BytesThisTime);
      if (FTermReason = meOK) then
      begin
        BytesToWrite := BytesToWrite - BytesThisTime;
        FWriteIdx := FWriteIdx + BytesThisTime;
      end
      else
        terminate;
    end;
    if (not (terminated)) then
    begin
      WaitForSingleObject(FDataMutex,INFINITE);
      {Cannot be sure that the stream hasn't been written to in the meantime!}
      {When the expression below is false, the wait on the idle semaphore
       will not block, so the stream should not get unnecessarily large}
      if FWriteIdx = FData.Size then
      begin
        FData.Clear;
        FWriteIdx := 0;
      end;
      ReleaseMutex(FDataMutex);
      WaitForSingleObject(FIdleSemaphore,INFINITE);
    end;
  end;
  inherited Execute;
end;

end.

