Skip to content

Commit

Permalink
Add CancellationToken argument to ICoapTransport Async methods
Browse files Browse the repository at this point in the history
Fixes #12
  • Loading branch information
NZSmartie committed Jan 20, 2018
1 parent 89d5c94 commit d7c9d9c
Show file tree
Hide file tree
Showing 13 changed files with 184 additions and 74 deletions.
2 changes: 1 addition & 1 deletion appveyor.release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ image: Visual Studio 2017
configuration: Release

environment:
package_version: '0.5.3'
package_version: '0.6.0'

git_access_token:
secure: kFNrVQrkAApceG8qiKcsgXAFx50qkY4Lfl2OLczSxwhEYbHh78AdPqyBn3P/JEjo
Expand Down
2 changes: 1 addition & 1 deletion appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ skip_tags: true
image: Visual Studio 2017

environment:
package_version: '0.5.3'
package_version: '0.6.0'

COVERALLS_REPO_TOKEN:
secure: o3Oj2doUP9AbvI5Phn28o+JHMk9W9P39yiyq1b3d96g0FoJ1loUm7PJ7GytUzxyO
Expand Down
2 changes: 1 addition & 1 deletion src/CoAPNet.Server/CoapHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ await connection.LocalEndpoint.SendAsync(
{
Endpoint = connection.RemoteEndpoint,
Payload = result.ToBytes()
});
}, CancellationToken.None);
}
}
}
Expand Down
52 changes: 39 additions & 13 deletions src/CoAPNet.Udp/CoapUdpEndPoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,20 +120,36 @@ public void Dispose()
Client?.Dispose();
}

public async Task<CoapPacket> ReceiveAsync()
public async Task<CoapPacket> ReceiveAsync(CancellationToken token)
{
if (Client == null)
await BindAsync();

var result = await Client.ReceiveAsync();
return new CoapPacket
try
{
Payload = result.Buffer,
Endpoint = new CoapUdpEndPoint(result.RemoteEndPoint) {Bindable = false},
};
var tcs = new TaskCompletionSource<bool>();
using (token.Register(() => tcs.SetResult(false)))
{
var receiveTask = Client.ReceiveAsync();
await Task.WhenAny(receiveTask, tcs.Task);

token.ThrowIfCancellationRequested();

return new CoapPacket
{
Payload = receiveTask.Result.Buffer,
Endpoint = new CoapUdpEndPoint(receiveTask.Result.RemoteEndPoint) {Bindable = false},
};
}
}
catch (OperationCanceledException)
{
Client.Dispose(); // Since UdpClient doesn't provide a mechanism for cancelling an async task. the safest way is to dispose the whole object
throw;
}
}

public async Task SendAsync(CoapPacket packet)
public async Task SendAsync(CoapPacket packet, CancellationToken token)
{
if (Client == null)
await BindAsync();
Expand Down Expand Up @@ -173,14 +189,24 @@ public async Task SendAsync(CoapPacket packet)
throw new CoapUdpEndpointException($"Unsupported {nameof(CoapPacket)}.{nameof(CoapPacket.Endpoint)} type ({packet.Endpoint.GetType().FullName})");
}

try
{
await Client.SendAsync(packet.Payload, packet.Payload.Length, udpDestEndpoint.Endpoint);
}
catch (SocketException se)
token.ThrowIfCancellationRequested();

var tcs = new TaskCompletionSource<bool>();
using (token.Register(() => tcs.SetResult(false)))
{
_logger?.LogInformation($"Failed to send data. {se.GetType().FullName} (0x{se.HResult:x}): {se.Message}", se);
try
{
await Task.WhenAny(Client.SendAsync(packet.Payload, packet.Payload.Length, udpDestEndpoint.Endpoint), tcs.Task);
if(token.IsCancellationRequested)
Client.Dispose(); // Since UdpClient doesn't provide a mechanism for cancelling an async task. the safest way is to dispose the whole object
}
catch (SocketException se)
{
_logger?.LogInformation($"Failed to send data. {se.GetType().FullName} (0x{se.HResult:x}): {se.Message}", se);
}
}

token.ThrowIfCancellationRequested();
}

public override string ToString()
Expand Down
13 changes: 11 additions & 2 deletions src/CoAPNet.Udp/CoapUdpTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

using System;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

Expand Down Expand Up @@ -44,6 +45,7 @@ public class CoapUdpTransport : ICoapTransport
private readonly ILogger<CoapUdpTransport> _logger;

private Task _listenTask;
private readonly CancellationTokenSource _listenTaskCTS = new CancellationTokenSource();

public CoapUdpTransport(CoapUdpEndPoint endPoint, ICoapHandler coapHandler, ILogger<CoapUdpTransport> logger = null)
{
Expand Down Expand Up @@ -80,7 +82,14 @@ public async Task UnbindAsync()
_endPoint = null;

endPoint.Dispose();
await _listenTask.ConfigureAwait(false);
try
{
_listenTaskCTS.Cancel();
await _listenTask.ConfigureAwait(false);
}
catch (OperationCanceledException)
{ }

_listenTask = null;
}

Expand All @@ -96,7 +105,7 @@ private async Task RunRequestsLoopAsync()
{
while (true)
{
var request = await _endPoint.ReceiveAsync();
var request = await _endPoint.ReceiveAsync(_listenTaskCTS.Token);
_logger?.LogDebug(CoapUdpLoggingEvents.TransportRequestsLoop, "Received message");

_ = ProcessRequestAsync(new CoapConnectionInformation
Expand Down
8 changes: 5 additions & 3 deletions src/CoAPNet/CoapClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public CoapClient(ICoapEndpoint endpoint)

private readonly ConcurrentQueue<Task<CoapReceiveResult>> _receiveQueue = new ConcurrentQueue<Task<CoapReceiveResult>>();
private Task _receiveTask = Task.CompletedTask;
private readonly CancellationTokenSource _receiveTaskCTS = new CancellationTokenSource();

private readonly AsyncAutoResetEvent _receiveEvent = new AsyncAutoResetEvent(false);

Expand Down Expand Up @@ -208,7 +209,7 @@ private async Task ReceiveAsyncInternal()
if (Endpoint == null)
return;

payloadTask = Endpoint.ReceiveAsync();
payloadTask = Endpoint.ReceiveAsync(_receiveTaskCTS.Token);
}

var payload = await payloadTask;
Expand Down Expand Up @@ -316,6 +317,7 @@ public void Dispose()
Endpoint = null;

endpoint?.Dispose();
_receiveTaskCTS.Cancel();

if (!_receiveTask.IsCompleted && !_receiveTask.IsCanceled && !_receiveTask.IsFaulted && !_receiveTask.Wait(5000))
throw new CoapClientException($"Took too long to dispose of the enclosed {nameof(Endpoint)}");
Expand Down Expand Up @@ -491,10 +493,10 @@ private async Task SendAsyncInternal(CoapMessage message, ICoapEndpoint remoteEn
{
Payload = message.ToBytes(),
Endpoint = remoteEndpoint
});
}, token);
}

await Task.Run(async () => await task, token).ConfigureAwait(false);
await task.ConfigureAwait(false);
}

internal void SetNextMessageId(int value)
Expand Down
9 changes: 5 additions & 4 deletions src/CoAPNet/ICoapEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,15 @@ public interface ICoapEndpoint : IDisposable
/// Called by [Service] to send a <see cref="CoapPacket.Payload"/> to the specified <see cref="CoapPacket.Endpoint"/> using the endpoint layer provided by the Application Layer
/// </summary>
/// <param name="packet"></param>
/// <param name="token"></param>
/// <returns></returns>
Task SendAsync(CoapPacket packet);
Task SendAsync(CoapPacket packet, CancellationToken token);

/// <summary>
/// Called by [service] to receive data from the endpoint layer
/// </summary>
/// <returns></returns>
Task<CoapPacket> ReceiveAsync();
Task<CoapPacket> ReceiveAsync(CancellationToken tokens);
}

/// <summary>
Expand All @@ -84,12 +85,12 @@ public void Dispose()
public bool IsSecure { get; internal set; }
public bool IsMulticast { get; internal set; }
public Uri BaseUri { get; internal set; }
public Task SendAsync(CoapPacket packet)
public Task SendAsync(CoapPacket packet, CancellationToken token)
{
throw new InvalidOperationException($"{nameof(CoapEndpoint)} can not be used to send and receive");
}

public Task<CoapPacket> ReceiveAsync()
public Task<CoapPacket> ReceiveAsync(CancellationToken token)
{
throw new InvalidOperationException($"{nameof(CoapEndpoint)} can not be used to send and receive");
}
Expand Down
14 changes: 7 additions & 7 deletions tests/CoAPNet.Tests/CoapBlockMessageTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void Write_BlockWiseCoapMessage(
response.Options.Add(new Options.Block1(block, blockSize, block != (totalBlocks - 1)));

mockClientEndpoint
.Setup(c => c.MockSendAsync(It.Is<CoapPacket>(p => p.Payload.SequenceEqual(expected.ToBytes()))))
.Setup(c => c.MockSendAsync(It.Is<CoapPacket>(p => p.Payload.SequenceEqual(expected.ToBytes())),It.IsAny<CancellationToken>()))
.Callback(() => mockClientEndpoint.Object.EnqueueReceivePacket(new CoapPacket { Payload = response.ToBytes() }))
.Returns(Task.CompletedTask)
.Verifiable($"Did not send block: {block}");
Expand Down Expand Up @@ -141,7 +141,7 @@ public void Write_BlockWiseCoapMessage_RemoteReduceBlockSize(int initialBlockSiz
response.Options.Add(new Options.Block1(0, reducetoBlockSize, initialBlockSize < totalBytes));

mockClientEndpoint
.Setup(c => c.MockSendAsync(It.Is<CoapPacket>(p => p.Payload.SequenceEqual(expected.ToBytes()))))
.Setup(c => c.MockSendAsync(It.Is<CoapPacket>(p => p.Payload.SequenceEqual(expected.ToBytes())), It.IsAny<CancellationToken>()))
.Callback(() => mockClientEndpoint.Object.EnqueueReceivePacket(new CoapPacket { Payload = response.ToBytes() }))
.Returns(Task.CompletedTask)
.Verifiable($"Did not send block: 0");
Expand All @@ -162,7 +162,7 @@ public void Write_BlockWiseCoapMessage_RemoteReduceBlockSize(int initialBlockSiz
response.Options.Add(new Options.Block1(block, reducetoBlockSize, block != (totalBlocks - 1)));

mockClientEndpoint
.Setup(c => c.MockSendAsync(It.Is<CoapPacket>(p => p.Payload.SequenceEqual(expected.ToBytes()))))
.Setup(c => c.MockSendAsync(It.Is<CoapPacket>(p => p.Payload.SequenceEqual(expected.ToBytes())), It.IsAny<CancellationToken>()))
.Callback(() => mockClientEndpoint.Object.EnqueueReceivePacket(new CoapPacket { Payload = response.ToBytes() }))
.Returns(Task.CompletedTask)
.Verifiable($"Did not send block: {block}");
Expand Down Expand Up @@ -227,7 +227,7 @@ public void Write_BlockWiseCoapMessage_BlockSizeTooLarge(int initialBlockSize, i
};

mockClientEndpoint
.Setup(c => c.MockSendAsync(It.Is<CoapPacket>(p => p.Payload.SequenceEqual(expected.ToBytes()))))
.Setup(c => c.MockSendAsync(It.Is<CoapPacket>(p => p.Payload.SequenceEqual(expected.ToBytes())), It.IsAny<CancellationToken>()))
.Callback(() => mockClientEndpoint.Object.EnqueueReceivePacket(new CoapPacket { Payload = response.ToBytes() }))
.Returns(Task.CompletedTask)
.Verifiable($"Did not send blockSize ({blockSize}) attempt");
Expand All @@ -248,7 +248,7 @@ public void Write_BlockWiseCoapMessage_BlockSizeTooLarge(int initialBlockSize, i
response.Options.Add(new Options.Block1(block, reducetoBlockSize, block != (totalBlocks - 1)));

mockClientEndpoint
.Setup(c => c.MockSendAsync(It.Is<CoapPacket>(p => p.Payload.SequenceEqual(expected.ToBytes()))))
.Setup(c => c.MockSendAsync(It.Is<CoapPacket>(p => p.Payload.SequenceEqual(expected.ToBytes())), It.IsAny<CancellationToken>()))
.Callback(() => mockClientEndpoint.Object.EnqueueReceivePacket(new CoapPacket { Payload = response.ToBytes() }))
.Returns(Task.CompletedTask)
.Verifiable($"Did not send block: {block}");
Expand Down Expand Up @@ -297,7 +297,7 @@ public void Setup_Read_BlockWiseCorrespondance(
Debug.WriteLine($" Response: {response}");

mockClientEndpoint
.Setup(c => c.MockSendAsync(It.Is<CoapPacket>(p => p.Payload.SequenceEqual(expected.ToBytes()))))
.Setup(c => c.MockSendAsync(It.Is<CoapPacket>(p => p.Payload.SequenceEqual(expected.ToBytes())), It.IsAny<CancellationToken>()))
.Callback(() => mockClientEndpoint.Object.EnqueueReceivePacket(new CoapPacket { Payload = response.ToBytes() }))
.Returns(Task.CompletedTask)
.Verifiable($"Did not send block: {block}");
Expand All @@ -321,7 +321,7 @@ public void Setup_Read_BlockWiseCorrespondance(
Debug.WriteLine($" Response: {response}");

mockClientEndpoint
.Setup(c => c.MockSendAsync(It.Is<CoapPacket>(p => p.Payload.SequenceEqual(expected.ToBytes()))))
.Setup(c => c.MockSendAsync(It.Is<CoapPacket>(p => p.Payload.SequenceEqual(expected.ToBytes())), It.IsAny<CancellationToken>()))
.Callback(() => mockClientEndpoint.Object.EnqueueReceivePacket(new CoapPacket { Payload = response.ToBytes() }))
.Returns(Task.CompletedTask)
.Verifiable($"Did not send block: {block}");
Expand Down
Loading

0 comments on commit d7c9d9c

Please sign in to comment.