Skip to content

Commit

Permalink
fix: reading bytes from a stream #51
Browse files Browse the repository at this point in the history
  • Loading branch information
richardschneider committed Sep 1, 2019
1 parent f2bbd51 commit 21c8e83
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 25 deletions.
6 changes: 1 addition & 5 deletions src/Multiplex/Muxer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,7 @@ public bool Receiver

// Read the payload.
var payload = new byte[length];
int offset = 0;
while (offset < length)
{
offset += await Channel.ReadAsync(payload, offset, length - offset, cancel).ConfigureAwait(false);
}
await Channel.ReadExactAsync(payload, 0, length, cancel).ConfigureAwait(false);

// Process the packet
Substreams.TryGetValue(header.StreamId, out Substream substream);
Expand Down
4 changes: 1 addition & 3 deletions src/ProtoBufHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ public static class ProtoBufHelper
{
var length = await stream.ReadVarint32Async(cancel).ConfigureAwait(false);
var bytes = new byte[length];
for (int offset = 0; offset < length;) {
offset += await stream.ReadAsync(bytes, offset, length - offset, cancel).ConfigureAwait(false);
}
await stream.ReadExactAsync(bytes, 0, length, cancel).ConfigureAwait(false);

using (var ms = new MemoryStream(bytes, false))
{
Expand Down
4 changes: 2 additions & 2 deletions src/Protocols/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public static class Message
var eol = new byte[1];
var length = await stream.ReadVarint32Async(cancel).ConfigureAwait(false);
var buffer = new byte[length - 1];
await stream.ReadAsync(buffer, 0, length - 1, cancel).ConfigureAwait(false);
await stream.ReadAsync(eol, 0, 1, cancel).ConfigureAwait(false);
await stream.ReadExactAsync(buffer, 0, length - 1, cancel).ConfigureAwait(false);
await stream.ReadExactAsync(eol, 0, 1, cancel).ConfigureAwait(false);
if (eol[0] != newline[0])
{
log.Error($"length: {length}, bytes: {buffer.ToHexString()}");
Expand Down
10 changes: 2 additions & 8 deletions src/Protocols/Ping1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,7 @@ public override string ToString()
{
// Read the message.
var request = new byte[PingSize];
for (int offset = 0; offset < PingSize;)
{
offset += await stream.ReadAsync(request, offset, PingSize - offset, cancel).ConfigureAwait(false);
}
await stream.ReadExactAsync(request, 0, PingSize, cancel).ConfigureAwait(false);
log.Debug($"got ping from {connection.RemotePeer}");

// Echo the message
Expand Down Expand Up @@ -147,10 +144,7 @@ async Task<IEnumerable<PingResult>> PingAsync(Peer peer, int count, Cancellation
await stream.FlushAsync(cancel).ConfigureAwait(false);

var response = new byte[PingSize];
for (int offset = 0; offset < PingSize;)
{
offset += await stream.ReadAsync(response, offset, PingSize - offset, cancel).ConfigureAwait(false);
}
await stream.ReadExactAsync(response, 0, PingSize, cancel).ConfigureAwait(false);

var result = new PingResult
{
Expand Down
2 changes: 1 addition & 1 deletion src/SecureCommunication/Secio1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public override string ToString()

// Receive our nonce.
var verification = new byte[localNonce.Length];
await secureStream.ReadAsync(verification, 0, verification.Length, cancel);
await secureStream.ReadExactAsync(verification, 0, verification.Length, cancel);
if (!localNonce.SequenceEqual(verification))
{
throw new Exception($"SECIO verification message failure.");
Expand Down
7 changes: 1 addition & 6 deletions src/SecureCommunication/Secio1Stream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,7 @@ async Task<byte[]> ReadPacketAsync(CancellationToken cancel)
async Task<byte[]> ReadPacketBytesAsync(int count, CancellationToken cancel)
{
byte[] buffer = new byte[count];
for (int i = 0, n; i < count; i += n)
{
n = await stream.ReadAsync(buffer, i, count - i, cancel).ConfigureAwait(false);
if (n < 1)
throw new EndOfStreamException();
}
await stream.ReadExactAsync(buffer, 0, count, cancel).ConfigureAwait(false);
return buffer;
}

Expand Down
95 changes: 95 additions & 0 deletions src/StreamExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace PeerTalk
{
/// <summary>
///
/// </summary>
public static class StreamExtensions
{
/// <summary>
/// Asynchronously reads a sequence of bytes from the stream and advances
/// the position within the stream by the number of bytes read.
/// </summary>
/// <param name="stream">
/// The stream to read from.
/// </param>
/// <param name="buffer">
/// The buffer to write the data into.
/// </param>
/// <param name="offset">
/// The byte offset in <paramref name="buffer"/> at which to begin
/// writing data from the <paramref name="stream"/>.
/// </param>
/// <param name="length">
/// The number of bytes to read.
/// </param>
/// <returns>
/// A task that represents the asynchronous operation.
/// </returns>
/// <exception cref="EndOfStreamException">
/// When the <paramref name="stream"/> does not have
/// <paramref name="length"/> bytes.
/// </exception>
public static async Task ReadExactAsync(this Stream stream, byte[] buffer, int offset, int length)
{
while (0 < length)
{
var n = await stream.ReadAsync(buffer, offset, length);
if (n == 0)
{
throw new EndOfStreamException();
}
offset += n;
length -= n;
}
}

/// <summary>
/// Asynchronously reads a sequence of bytes from the stream and advances
/// the position within the stream by the number of bytes read.
/// </summary>
/// <param name="stream">
/// The stream to read from.
/// </param>
/// <param name="buffer">
/// The buffer to write the data into.
/// </param>
/// <param name="offset">
/// The byte offset in <paramref name="buffer"/> at which to begin
/// writing data from the <paramref name="stream"/>.
/// </param>
/// <param name="length">
/// The number of bytes to read.
/// </param>
/// <param name="cancel">
/// Is used to stop the task.
/// </param>
/// <returns>
/// A task that represents the asynchronous operation.
/// </returns>
/// <exception cref="EndOfStreamException">
/// When the <paramref name="stream"/> does not have
/// <paramref name="length"/> bytes.
/// </exception>
public static async Task ReadExactAsync(this Stream stream, byte[] buffer, int offset, int length, CancellationToken cancel)
{
while (0 < length)
{
var n = await stream.ReadAsync(buffer, offset, length, cancel);
if (n == 0)
{
throw new EndOfStreamException();
}
offset += n;
length -= n;
}
}
}
}
73 changes: 73 additions & 0 deletions test/StreamExtensionsTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Ipfs;
using System;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Net.Sockets;

namespace PeerTalk
{
[TestClass]
public class StreamExtensionsTest
{
[TestMethod]
public async Task ReadAsync()
{
var expected = new byte[] { 1, 2, 3, 4 };
using (var ms = new MemoryStream(expected))
{
var actual = new byte[expected.Length];
await ms.ReadExactAsync(actual, 0, actual.Length);
CollectionAssert.AreEqual(expected, actual);
}
}

[TestMethod]
public void ReadAsync_EOS()
{
var expected = new byte[] { 1, 2, 3, 4 };
var actual = new byte[expected.Length + 1];

using (var ms = new MemoryStream(expected))
{
ExceptionAssert.Throws<EndOfStreamException>(() =>
{
ms.ReadExactAsync(actual, 0, actual.Length).Wait();
});
}

var cancel = new CancellationTokenSource();
using (var ms = new MemoryStream(expected))
{
ExceptionAssert.Throws<EndOfStreamException>(() =>
{
ms.ReadExactAsync(actual, 0, actual.Length, cancel.Token).Wait();
});
}
}

[TestMethod]
public async Task ReadAsync_Cancel()
{
var expected = new byte[] { 1, 2, 3, 4 };
var actual = new byte[expected.Length];
var cancel = new CancellationTokenSource();
using (var ms = new MemoryStream(expected))
{
await ms.ReadExactAsync(actual, 0, actual.Length, cancel.Token);
CollectionAssert.AreEqual(expected, actual);
}

cancel.Cancel();
using (var ms = new MemoryStream(expected))
{
ExceptionAssert.Throws<TaskCanceledException>(() =>
{
ms.ReadExactAsync(actual, 0, actual.Length, cancel.Token).Wait();
});
}
}
}
}

0 comments on commit 21c8e83

Please sign in to comment.