Skip to content

Commit

Permalink
Merge pull request #185 from Ralf1108/dev
Browse files Browse the repository at this point in the history
fixed issue #40 regarding partial streams
  • Loading branch information
Arkatufus authored Oct 29, 2020
2 parents 4f16c3e + e31c339 commit 81ff792
Show file tree
Hide file tree
Showing 18 changed files with 220 additions and 20 deletions.
84 changes: 84 additions & 0 deletions src/Hyperion.Tests/IncompleteStreamTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
using System.IO;
using Xunit;

namespace Hyperion.Tests
{
public class IncompleteStreamTests : TestBase
{
private const int IncompleteBytes = 4;

public IncompleteStreamTests()
: base(x => new IncompleteReadStream(x, IncompleteBytes))
{
}

[Fact]
public void ThrowsOnEOF()
{
double data = 4; //double has 8 bytes
Serialize(data);
Reset();

// manifest requires 1 byte
// incomplete returned bytes are then (IncompleteBytes)4 - 1 = 3 => EOF
Assert.Throws<EndOfStreamException>(() => Deserialize<int>());
}

private class IncompleteReadStream : Stream
{
private readonly Stream _baseStream;
private readonly int _maxReadBytes;

private int _totalReadBytes;

public IncompleteReadStream(Stream baseStream, int maxReadBytes)
{
_baseStream = baseStream;
_maxReadBytes = maxReadBytes;
}

public override void Flush()
{
_baseStream.Flush();
}

public override long Seek(long offset, SeekOrigin origin)
{
return _baseStream.Seek(offset, origin);
}

public override void SetLength(long value)
{
_baseStream.SetLength(value);
}

public override int Read(byte[] buffer, int offset, int count)
{
var allBytes = _totalReadBytes + count;
var bytesToRead = allBytes > _maxReadBytes
? _maxReadBytes - _totalReadBytes
: count;

var readBytes = _baseStream.Read(buffer, offset, bytesToRead);
_totalReadBytes += readBytes;
return readBytes;
}

public override void Write(byte[] buffer, int offset, int count)
{
_baseStream.Write(buffer, offset, count);
}

public override bool CanRead => _baseStream.CanRead;
public override bool CanSeek => _baseStream.CanSeek;
public override bool CanWrite => _baseStream.CanWrite;
public override long Length => _baseStream.Length;

public override long Position
{
get => _baseStream.Position;
set => _baseStream.Position = value;
}
}
}
}
58 changes: 58 additions & 0 deletions src/Hyperion.Tests/PartialStreamTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
using System.IO;

namespace Hyperion.Tests
{
public class PartialStreamTests : PrimitivesTest
{
public PartialStreamTests()
: base(x => new OneBytePerReadStream(x))
{
}

private class OneBytePerReadStream : Stream
{
private readonly Stream _baseStream;

public OneBytePerReadStream(Stream baseStream)
{
_baseStream = baseStream;
}

public override void Flush()
{
_baseStream.Flush();
}

public override long Seek(long offset, SeekOrigin origin)
{
return _baseStream.Seek(offset, origin);
}

public override void SetLength(long value)
{
_baseStream.SetLength(value);
}

public override int Read(byte[] buffer, int offset, int count)
{
return _baseStream.Read(buffer, offset, count > 0 ? 1 : 0);
}

public override void Write(byte[] buffer, int offset, int count)
{
_baseStream.Write(buffer, offset, count);
}

public override bool CanRead => _baseStream.CanRead;
public override bool CanSeek => _baseStream.CanSeek;
public override bool CanWrite => _baseStream.CanWrite;
public override long Length => _baseStream.Length;

public override long Position
{
get => _baseStream.Position;
set => _baseStream.Position = value;
}
}
}
}
10 changes: 10 additions & 0 deletions src/Hyperion.Tests/PrimitivesTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,23 @@
#endregion

using System;
using System.IO;
using Xunit;

namespace Hyperion.Tests
{

public class PrimitivesTest : TestBase
{
public PrimitivesTest()
{
}

protected PrimitivesTest(Func<Stream, Stream> streamFacade)
: base(streamFacade)
{
}

[Fact]
public void CanSerializeTuple1()
{
Expand Down
10 changes: 8 additions & 2 deletions src/Hyperion.Tests/TestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// -----------------------------------------------------------------------
#endregion

using System;
using System.IO;
using Xunit;

Expand All @@ -15,12 +16,17 @@ namespace Hyperion.Tests
public abstract class TestBase
{
private Serializer _serializer;
private readonly MemoryStream _stream;
private readonly Stream _stream;

protected TestBase()
: this(x => x)
{
}

protected TestBase(Func<Stream, Stream> streamFacade)
{
_serializer = new Serializer();
_stream = new MemoryStream();
_stream = streamFacade(new MemoryStream());
}

protected void CustomInit(Serializer serializer)
Expand Down
41 changes: 36 additions & 5 deletions src/Hyperion/Extensions/StreamEx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ namespace Hyperion.Extensions
{
internal static class StreamEx
{

public static uint ReadVarint32(this Stream stream)
{
int result = 0;
Expand Down Expand Up @@ -75,15 +74,15 @@ public static void WriteVarint64(this Stream stream, ulong value)
public static uint ReadUInt16(this Stream self, DeserializerSession session)
{
var buffer = session.GetBuffer(2);
self.Read(buffer, 0, 2);
self.ReadFull(buffer, 0, 2);
var res = BitConverter.ToUInt16(buffer, 0);
return res;
}

public static int ReadInt32(this Stream self, DeserializerSession session)
{
var buffer = session.GetBuffer(4);
self.Read(buffer, 0, 4);
self.ReadFull(buffer, 0, 4);
var res = BitConverter.ToInt32(buffer, 0);
return res;
}
Expand All @@ -92,7 +91,7 @@ public static byte[] ReadLengthEncodedByteArray(this Stream self, DeserializerSe
{
var length = self.ReadInt32(session);
var buffer = new byte[length];
self.Read(buffer, 0, length);
self.ReadFull(buffer, 0, length);
return buffer;
}

Expand Down Expand Up @@ -189,10 +188,42 @@ public static string ReadString(this Stream stream, DeserializerSession session)
}

var buffer = session.GetBuffer(length);
stream.ReadFull(buffer, 0, length);

stream.Read(buffer, 0, length);
var res = StringEx.FromUtf8Bytes(buffer, 0, length);
return res;
}

/// <summary>
/// Repeats reading from stream until requested bytes were read.
/// Returns with partial result if stream can't provide enough bytes
/// Fixes issue: https://github.com/akkadotnet/Hyperion/issues/40
/// Reference for allowed partial streams: https://docs.microsoft.com/en-us/dotnet/api/system.io.stream.read?redirectedfrom=MSDN&view=netcore-3.1#System_IO_Stream_Read_System_Byte___System_Int32_System_Int32_
/// -> "An implementation is free to return fewer bytes than requested even if the end of the stream has not been reached."
/// </summary>
public static int ReadFull(this Stream stream, byte[] buffer, int offset, int count)
{
// fast path for streams which doesn't deliver partial results
var totalReadBytes = stream.Read(buffer, offset, count);
if (totalReadBytes == count)
return totalReadBytes;

// support streams with partial results
do
{
var readBytes = stream.Read(buffer, offset + totalReadBytes, count - totalReadBytes);
if (readBytes == 0)
break; // EOF

totalReadBytes += readBytes;
}
while (totalReadBytes < count);

// received enough bytes?
if (totalReadBytes != count)
throw new EndOfStreamException("Stream didn't returned sufficient bytes");

return totalReadBytes;
}
}
}
3 changes: 2 additions & 1 deletion src/Hyperion/ValueSerializers/CharSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

using System;
using System.IO;
using Hyperion.Extensions;

namespace Hyperion.ValueSerializers
{
Expand All @@ -24,7 +25,7 @@ public CharSerializer() : base(Manifest, () => WriteValueImpl, () => ReadValueIm

public static char ReadValueImpl(Stream stream, byte[] bytes)
{
stream.Read(bytes, 0, Size);
stream.ReadFull(bytes, 0, Size);
return BitConverter.ToChar(bytes, 0);
}

Expand Down
2 changes: 1 addition & 1 deletion src/Hyperion/ValueSerializers/ConsistentArraySerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public override object ReadValue(Stream stream, DeserializerSession session)
var size = elementType.GetTypeSize();
var totalSize = size*length;
var buffer = session.GetBuffer(totalSize);
stream.Read(buffer, 0, totalSize);
stream.ReadFull(buffer, 0, totalSize);
Buffer.BlockCopy(buffer, 0, array, 0, totalSize);
}
else
Expand Down
3 changes: 2 additions & 1 deletion src/Hyperion/ValueSerializers/DateTimeOffsetSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

using System;
using System.IO;
using Hyperion.Extensions;

namespace Hyperion.ValueSerializers
{
Expand Down Expand Up @@ -36,7 +37,7 @@ public static DateTimeOffset ReadValueImpl(Stream stream, byte[] bytes)

private static DateTimeOffset ReadDateTimeOffset(Stream stream, byte[] bytes)
{
stream.Read(bytes, 0, Size);
stream.ReadFull(bytes, 0, Size);
var dateTimeTicks = BitConverter.ToInt64(bytes, 0);
var offsetTicks = BitConverter.ToInt64(bytes, sizeof(long));
var dateTimeOffset = new DateTimeOffset(dateTimeTicks, TimeSpan.FromTicks(offsetTicks));
Expand Down
3 changes: 2 additions & 1 deletion src/Hyperion/ValueSerializers/DateTimeSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

using System;
using System.IO;
using Hyperion.Extensions;

namespace Hyperion.ValueSerializers
{
Expand Down Expand Up @@ -36,7 +37,7 @@ public static DateTime ReadValueImpl(Stream stream, byte[] bytes)

private static DateTime ReadDateTime(Stream stream, byte[] bytes)
{
stream.Read(bytes, 0, Size);
stream.ReadFull(bytes, 0, Size);
var ticks = BitConverter.ToInt64(bytes, 0);
var kind = (DateTimeKind) bytes[Size - 1]; //avoid reading a single byte from the stream
var dateTime = new DateTime(ticks, kind);
Expand Down
3 changes: 2 additions & 1 deletion src/Hyperion/ValueSerializers/DoubleSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

using System;
using System.IO;
using Hyperion.Extensions;

namespace Hyperion.ValueSerializers
{
Expand All @@ -30,7 +31,7 @@ public static void WriteValueImpl(Stream stream, double d, byte[] bytes)

public static double ReadValueImpl(Stream stream, byte[] bytes)
{
stream.Read(bytes, 0, Size);
stream.ReadFull(bytes, 0, Size);
return BitConverter.ToDouble(bytes, 0);
}

Expand Down
3 changes: 2 additions & 1 deletion src/Hyperion/ValueSerializers/FloatSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

using System;
using System.IO;
using Hyperion.Extensions;

namespace Hyperion.ValueSerializers
{
Expand All @@ -30,7 +31,7 @@ public static void WriteValueImpl(Stream stream, float f, byte[] bytes)

public static float ReadValueImpl(Stream stream, byte[] bytes)
{
stream.Read(bytes, 0, Size);
stream.ReadFull(bytes, 0, Size);
return BitConverter.ToSingle(bytes, 0);
}

Expand Down
2 changes: 1 addition & 1 deletion src/Hyperion/ValueSerializers/GuidSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public static void WriteValueImpl(Stream stream, Guid g)
public static Guid ReadValueImpl(Stream stream)
{
var buffer = new byte[16];
stream.Read(buffer, 0, 16);
stream.ReadFull(buffer, 0, 16);
return new Guid(buffer);
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/Hyperion/ValueSerializers/Int16Serializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

using System;
using System.IO;
using Hyperion.Extensions;

namespace Hyperion.ValueSerializers
{
Expand All @@ -30,7 +31,7 @@ public static void WriteValueImpl(Stream stream, short sh, byte[] bytes)

public static short ReadValueImpl(Stream stream, byte[] bytes)
{
stream.Read(bytes, 0, Size);
stream.ReadFull(bytes, 0, Size);
return BitConverter.ToInt16(bytes, 0);
}

Expand Down
Loading

0 comments on commit 81ff792

Please sign in to comment.