Skip to content

Commit

Permalink
THRIFT-5851 Promote known total stream sizes for seekable stream tran…
Browse files Browse the repository at this point in the history
…sports properly

Client: Delphi
Patch: Jens Geyer
  • Loading branch information
Jens-G committed Feb 5, 2025
1 parent c854f64 commit 5a781c2
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 54 deletions.
2 changes: 1 addition & 1 deletion lib/delphi/src/Thrift.Protocol.pas
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ function TProtocolImpl.Configuration : IThriftConfiguration;

procedure TProtocolImpl.Reset;
begin
FTrans.ResetConsumedMessageSize;
FTrans.ResetMessageSizeAndConsumedBytes;
end;

function TProtocolImpl.ReadString: string;
Expand Down
26 changes: 25 additions & 1 deletion lib/delphi/src/Thrift.Stream.pas
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ interface

type
IThriftStream = interface
['{3A61A8A6-3639-4B91-A260-EFCA23944F3A}']
['{67801A9F-3B85-41CF-9025-D18AC6849B58}']
procedure Write( const buffer: TBytes; offset: Integer; count: Integer); overload;
procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); overload;
function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; overload;
Expand All @@ -47,6 +47,7 @@ interface
procedure Flush;
function IsOpen: Boolean;
function ToArray: TBytes;
function CanSeek : Boolean;
function Size : Int64;
function Position : Int64;
end;
Expand All @@ -66,6 +67,7 @@ TThriftStreamImpl = class abstract( TInterfacedObject, IThriftStream)
procedure Flush; virtual; abstract;
function IsOpen: Boolean; virtual; abstract;
function ToArray: TBytes; virtual; abstract;
function CanSeek : Boolean; virtual;
function Size : Int64; virtual;
function Position : Int64; virtual;
end;
Expand All @@ -83,6 +85,7 @@ TThriftStreamAdapterDelphi = class( TThriftStreamImpl)
procedure Flush; override;
function IsOpen: Boolean; override;
function ToArray: TBytes; override;
function CanSeek : Boolean; override;
function Size : Int64; override;
function Position : Int64; override;
public
Expand All @@ -102,6 +105,7 @@ TThriftStreamAdapterCOM = class( TThriftStreamImpl)
procedure Flush; override;
function IsOpen: Boolean; override;
function ToArray: TBytes; override;
function CanSeek : Boolean; override;
function Size : Int64; override;
function Position : Int64; override;
public
Expand Down Expand Up @@ -176,6 +180,12 @@ procedure TThriftStreamAdapterCOM.Flush;
end;
end;

function TThriftStreamAdapterCOM.CanSeek : Boolean;
var statstg: TStatStg;
begin
result := IsOpen and Succeeded( FStream.Stat( statstg, STATFLAG_NONAME));
end;

function TThriftStreamAdapterCOM.Size : Int64;
var statstg: TStatStg;
begin
Expand Down Expand Up @@ -290,6 +300,11 @@ procedure TThriftStreamImpl.Write( const pBuf : Pointer; offset: Integer; count:
CheckSizeAndOffset( pBuf, offset+count, offset, count);
end;

function TThriftStreamImpl.CanSeek : Boolean;
begin
result := FALSE; // TRUE indicates Size and Position are implemented
end;

function TThriftStreamImpl.Size : Int64;
begin
ASSERT(FALSE);
Expand Down Expand Up @@ -332,6 +347,15 @@ procedure TThriftStreamAdapterDelphi.Flush;
// nothing to do
end;

function TThriftStreamAdapterDelphi.CanSeek : Boolean;
begin
try
result := IsOpen and (FStream.Size >= 0); // throws if not implemented
except
result := FALSE; // seek not implemented
end;
end;

function TThriftStreamAdapterDelphi.Size : Int64;
begin
result := FStream.Size;
Expand Down
2 changes: 1 addition & 1 deletion lib/delphi/src/Thrift.Transport.MsxmlHTTP.pas
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ procedure TMsxmlHTTPClientImpl.SendRequest;
xmlhttp.send( IUnknown( TStreamAdapter.Create( ms, soReference )));
FInputStream := nil;
FInputStream := TThriftStreamAdapterCOM.Create( IUnknown( xmlhttp.responseStream) as IStream);
ResetConsumedMessageSize;
ResetMessageSizeAndConsumedBytes;
UpdateKnownMessageSize( FInputStream.Size);
finally
ms.Free;
Expand Down
24 changes: 12 additions & 12 deletions lib/delphi/src/Thrift.Transport.Pipes.pas
Original file line number Diff line number Diff line change
Expand Up @@ -679,22 +679,22 @@ procedure THandlePipeStreamImpl.Open;

function TPipeTransportBase.GetIsOpen: Boolean;
begin
result := (FInputStream <> nil) and (FInputStream.IsOpen)
and (FOutputStream <> nil) and (FOutputStream.IsOpen);
result := (InputStream <> nil) and (InputStream.IsOpen)
and (OutputStream <> nil) and (OutputStream.IsOpen);
end;


procedure TPipeTransportBase.Open;
begin
FInputStream.Open;
FOutputStream.Open;
InputStream.Open;
OutputStream.Open;
end;


procedure TPipeTransportBase.Close;
begin
FInputStream.Close;
FOutputStream.Close;
InputStream.Close;
OutputStream.Close;
end;


Expand All @@ -709,8 +709,8 @@ constructor TNamedPipeTransportClientEndImpl.Create( const aPipeName : string;
// Named pipe constructor
begin
inherited Create( nil, nil, aConfig);
FInputStream := TNamedPipeStreamImpl.Create( aPipeName, TRUE, aShareMode, aSecurityAttributes, aTimeOut, aOpenTimeOut);
FOutputStream := FInputStream; // true for named pipes
SetInputStream( TNamedPipeStreamImpl.Create( aPipeName, TRUE, aShareMode, aSecurityAttributes, aTimeOut, aOpenTimeOut));
SetOutputStream( InputStream); // true for named pipes
end;


Expand All @@ -721,8 +721,8 @@ constructor TNamedPipeTransportClientEndImpl.Create( const aPipe : THandle;
// Named pipe constructor
begin
inherited Create( nil, nil, aConfig);
FInputStream := THandlePipeStreamImpl.Create( aPipe, aOwnsHandle, TRUE, aTimeOut);
FOutputStream := FInputStream; // true for named pipes
SetInputStream( THandlePipeStreamImpl.Create( aPipe, aOwnsHandle, TRUE, aTimeOut));
SetOutputStream( InputStream); // true for named pipes
end;


Expand Down Expand Up @@ -761,8 +761,8 @@ constructor TAnonymousPipeTransportImpl.Create( const aPipeRead, aPipeWrite : TH
begin
inherited Create( nil, nil, aConfig);
// overlapped is not supported with AnonPipes, see MSDN
FInputStream := THandlePipeStreamImpl.Create( aPipeRead, aOwnsHandles, FALSE, aTimeout);
FOutputStream := THandlePipeStreamImpl.Create( aPipeWrite, aOwnsHandles, FALSE, aTimeout);
SetInputStream( THandlePipeStreamImpl.Create( aPipeRead, aOwnsHandles, FALSE, aTimeout));
SetOutputStream( THandlePipeStreamImpl.Create( aPipeWrite, aOwnsHandles, FALSE, aTimeout));
end;


Expand Down
2 changes: 1 addition & 1 deletion lib/delphi/src/Thrift.Transport.WinHTTP.pas
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ procedure TWinHTTPClientImpl.SendRequest;
end;

// we're about to receive a new message, so reset everyting
ResetConsumedMessageSize(-1);
ResetMessageSizeAndConsumedBytes(-1);
FInputStream := THTTPResponseStream.Create( http);
if http.QueryTotalResponseSize( dwSize) // FALSE indicates "no info available"
then UpdateKnownMessageSize( dwSize);
Expand Down
Loading

0 comments on commit 5a781c2

Please sign in to comment.