Skip to content

Commit

Permalink
Support reading block-wise from remote endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
NZSmartie committed Jan 1, 2018
1 parent fbf5878 commit a1aea35
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 15 deletions.
122 changes: 110 additions & 12 deletions src/CoAPNet/CoapBlockStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ public static int DefaultBlockSize
}

private readonly CoapClient _client;
private readonly ICoapEndpoint _endpoint;

private readonly ByteQueue _reader = new ByteQueue();
private readonly Task _readerTask;
private readonly AsyncAutoResetEvent _readerEvent = new AsyncAutoResetEvent(false);

//private readonly Task _readerTask;

private readonly ByteQueue _writer = new ByteQueue();
private readonly Task _writerTask;
Expand All @@ -52,6 +54,7 @@ public static int DefaultBlockSize
private bool _endOfStream;

private int _blockSize = DefaultBlockSize;
private int _readBlockNumber;

/// <summary>
/// Gets or sets the maximum amount of time spent writing to <see cref="CoapClient"/> during <see cref="Dispose(bool)"/>
Expand Down Expand Up @@ -80,25 +83,108 @@ public int BlockSize
}

/// <inheritdoc/>
public override bool CanRead => true;
public override bool CanRead => !_endOfStream && (_readerTask?.IsCompleted ?? false);

/// <inheritdoc/>
public override bool CanSeek => false;

/// <inheritdoc/>
public override bool CanWrite => !_endOfStream;
public override bool CanWrite => !_endOfStream && (_writerTask?.IsCompleted ?? false);

/// <summary>
/// Create a new <see cref="CoapBlockStream"/> using <paramref name="client"/> to read and write blocks of data. <paramref name="baseMessage"/> is required to base blocked messages off of.
/// </summary>
/// <param name="client"></param>
/// <param name="baseMessage"></param>
public CoapBlockStream(CoapClient client, CoapMessage baseMessage = null)
/// <param name="endpoint"></param>
public CoapBlockStream(CoapClient client, CoapMessage baseMessage, ICoapEndpoint endpoint = null)
{
_client = client;
_baseMessage = baseMessage;
_client = client ?? throw new ArgumentNullException(nameof(client));
_endpoint = endpoint;

if (!_baseMessage.Code.IsRequest())
throw new InvalidOperationException($"Can not create a {nameof(CoapBlockStream)} with a {nameof(baseMessage)}.{nameof(baseMessage.Type)} of {baseMessage.Type}");

_baseMessage = baseMessage?.Clone()
?? throw new ArgumentNullException(nameof(baseMessage));

_writerTask = WriteBlocksAsync();


}

/// <summary>
/// Create a new <see cref="CoapBlockStream"/> using <paramref name="client"/> to read and write blocks of data. <paramref name="baseMessage"/> is required to base blocked messages off of.
/// </summary>
/// <param name="client"></param>
/// <param name="baseMessage"></param>
/// <param name="endpoint"></param>
public CoapBlockStream(CoapClient client, CoapMessage baseMessage, CoapMessage requestMessage, ICoapEndpoint endpoint = null)
{
_client = client ?? throw new ArgumentNullException(nameof(client));
_endpoint = endpoint;

_baseMessage = requestMessage?.Clone()
?? throw new ArgumentNullException(nameof(requestMessage));

var payload = baseMessage.Payload;

var block2 = baseMessage.Options.Get<Block2>();
_blockSize = block2.BlockSize;
_readBlockNumber = block2.BlockNumber;
_endOfStream = !block2.IsMoreFollowing;

_reader.Enqueue(payload, 0, payload.Length);

_readBlockNumber += payload.Length / _blockSize;

_readerTask = ReadBlocksAsync();
}

private async Task ReadBlocksAsync()
{
var cancellationToken = _cancellationTokenSource.Token;
//var meessageToken = new Random().Next();

try
{
while (!_endOfStream && !cancellationToken.IsCancellationRequested)
{
var message = _baseMessage.Clone();
message.Id = 0;

message.Options.Add(new Block2(_readBlockNumber, _blockSize));

var messageId = await _client.SendAsync(message, _endpoint, cancellationToken);

var response = await _client.GetResponseAsync(messageId, cancellationToken);

if (!response.Code.IsSuccess())
throw new CoapBlockException("Error occured while reading blocks from remote endpoint",
CoapException.FromCoapMessage(response), response.Code);

var block2 = response.Options.Get<Block2>();

if (block2.BlockNumber != _readBlockNumber)
throw new CoapBlockException("Received incorrect block number from remote host");

_readBlockNumber++;

_reader.Enqueue(response.Payload, 0, response.Payload.Length);
_readerEvent.Set();
_endOfStream = !response.Options.Get<Block2>().IsMoreFollowing;
}
}
catch (Exception ex)
{
// Hold onto the exception to throw it from a synchronous call.
_caughtException = ex;
}
finally
{
_endOfStream = true;
_readerEvent.Set();
}
}

private async Task WriteBlocksAsync()
Expand All @@ -123,6 +209,7 @@ private async Task WriteBlocksAsync()
_writer.Peek(message.Payload, 0, _blockSize);

var messageId = await _client.SendAsync(message, token);

var result = await _client.GetResponseAsync(messageId, token);

if (result.Code.IsSuccess())
Expand Down Expand Up @@ -206,7 +293,17 @@ public override async Task FlushAsync(CancellationToken cancellationToken)
/// <inheritdoc/>
public override int Read(byte[] buffer, int offset, int count)
{
throw new NotImplementedException();
// TODO: implement timeout logic here
_readerEvent.WaitAsync(CancellationToken.None).Wait();

return _reader.Dequeue(buffer, offset, count);
}

public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
await _readerEvent.WaitAsync(cancellationToken);

return _reader.Dequeue(buffer, offset, count);
}

/// <inheritdoc/>
Expand All @@ -222,10 +319,12 @@ protected override void Dispose(bool disposing)
{
if (disposing)
{
if (_caughtException == null && !_writerTask.IsCompleted)
{
_endOfStream = true;
ThrowCaughtException();

_endOfStream = true;

if(_writerTask != null && !_writerTask.IsCompleted)
{
// Write any/all data to the output
if (_writer.Length > 0)
_writerEvent.Set();
Expand All @@ -241,9 +340,8 @@ protected override void Dispose(bool disposing)
ExceptionDispatchInfo.Capture(ex.InnerException).Throw();
}
}

ThrowCaughtException();
}

base.Dispose(disposing);
}

Expand Down
100 changes: 97 additions & 3 deletions tests/CoAPNet.Tests/CoapBlockMessageTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,104 @@ public void Write_BlockWiseCoapMessage_BlockSizeTooLarge(int initialBlockSize, i
}

[Test]
[Category("Blocks")]
public void ReadCoapMessageBlocks()
[Category("[RFC7959] Section 2.4"), Category("Blocks")]
public async Task Read_BlockWiseCoapMessage(
[Values(16, 32, 64, 128, 256, 512, 1024)] int blockSize,
[Range(1, 2)] int blocks,
[Values] bool lastHalfblock)
{
Assert.Inconclusive("Not Implemented");
// Arrange
var mockClientEndpoint = new Mock<MockEndpoint>() { CallBase = true };

var messageId = 1;

// "lambda" for generating our pseudo payload
byte[] byteRange(int a, int b) => Enumerable.Range(a, b).Select(i => Convert.ToByte(i % (byte.MaxValue + 1))).ToArray();

int totalBytes = (blocks * blockSize) + (lastHalfblock ? blockSize / 2 : 0);
int totalBlocks = ((totalBytes - 1) / blockSize) + 1;

var baseRequestMessage = new CoapMessage
{
Code = CoapMessageCode.Get,
Type = CoapMessageType.Confirmable,
};

baseRequestMessage.SetUri("/status", UriComponents.Path);

var baseResponseMessage = new CoapMessage
{
Code = CoapMessageCode.Content,
Type = CoapMessageType.Acknowledgement,
};

{
var block = 0;
var bytes = Math.Min(totalBytes - (block * blockSize), blockSize);

var expected = baseRequestMessage.Clone();
expected.Id = messageId;

var response = baseResponseMessage.Clone();
response.Id = messageId++;
response.Options.Add(new Options.Block2(block, blockSize, block != (totalBlocks - 1)));
response.Payload = byteRange(blockSize * block, bytes);

Debug.WriteLine($"Expecting: {expected}");
Debug.WriteLine($" Response: {response}");

mockClientEndpoint
.Setup(c => c.MockSendAsync(It.Is<CoapPacket>(p => p.Payload.SequenceEqual(expected.ToBytes()))))
.Callback(() => mockClientEndpoint.Object.EnqueueReceivePacket(new CoapPacket { Payload = response.ToBytes() }))
.Returns(Task.CompletedTask)
.Verifiable($"Did not send block: {block}");
}

// Generate an expected packet and response for all block-wise requests
for (var block = 1; block < totalBlocks; block++)
{
var bytes = Math.Min(totalBytes - (block * blockSize), blockSize);

var expected = baseRequestMessage.Clone();
expected.Id = messageId;
expected.Options.Add(new Options.Block2(block, blockSize));

var response = baseResponseMessage.Clone();
response.Id = messageId++;
response.Options.Add(new Options.Block2(block, blockSize, block != (totalBlocks - 1)));
response.Payload = byteRange(blockSize * block, bytes);

Debug.WriteLine($"Expecting: {expected}");
Debug.WriteLine($" Response: {response}");

mockClientEndpoint
.Setup(c => c.MockSendAsync(It.Is<CoapPacket>(p => p.Payload.SequenceEqual(expected.ToBytes()))))
.Callback(() => mockClientEndpoint.Object.EnqueueReceivePacket(new CoapPacket { Payload = response.ToBytes() }))
.Returns(Task.CompletedTask)
.Verifiable($"Did not send block: {block}");
}

var result = new byte[totalBytes];

// Act
using (var client = new CoapClient(mockClientEndpoint.Object))
{
var ct = new CancellationTokenSource(MaxTaskTimeout);

client.SetNextMessageId(1);

var identifier = await client.SendAsync(baseRequestMessage, ct.Token);

var response = await client.GetResponseAsync(identifier, ct.Token);

using (var reader = new CoapBlockStream(client, response, baseRequestMessage))
{
reader.Read(result, 0, totalBytes);
};
}

// Assert
mockClientEndpoint.Verify();
}
}
}

0 comments on commit a1aea35

Please sign in to comment.