Skip to content

Commit

Permalink
Add new overloads for `GlobalExclusiveDeviceAccess.CommunicateWithDev…
Browse files Browse the repository at this point in the history
…ice` (#377)
  • Loading branch information
frobijn authored Oct 9, 2024
1 parent c5b0e01 commit d84256c
Show file tree
Hide file tree
Showing 6 changed files with 460 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,98 +11,294 @@ namespace nanoFramework.Tools.Debugger.NFDevice
/// <summary>
/// Code that wants to access a device should use this system-wide exclusive access while
/// communicating to a device to prevent that another nanoFramework tool also wants to
/// communicate with the device.
/// communicate with the device. The public methods return an instance of <see cref="GlobalExclusiveDeviceAccess"/>
/// if the access has been granted; that instance has to be disposed of when exclusive
/// access is no longer needed.
/// </summary>
public static class GlobalExclusiveDeviceAccess
public sealed class GlobalExclusiveDeviceAccess : IDisposable
{
#region Fields

/// <summary>
/// Base name for the system-wide mutex that controls access to a device connected to a COM port.
/// </summary>
private const string MutexBaseName = "276545121198496AADD346A60F14EF8D_";
private const string MutexBaseName = @"276545121198496AADD346A60F14EF8D_";
private static readonly Dictionary<string, (AsyncLocal<GlobalExclusiveDeviceAccess> instance, Semaphore mutex)> s_locks = [];
private readonly Semaphore _mutex;
private int _lockCount;
private readonly string _portInstanceId;

#endregion

#region Methods

/// <summary>
/// Get exclusive access to a connected device to communicate with that device.
/// </summary>
/// <param name="device">The connected device.</param>
/// <param name="millisecondsTimeout">Maximum time in milliseconds to wait for exclusive access</param>
/// <param name="cancellationToken">Cancellation token that can be cancelled to stop/abort waiting for the exclusive access.</param>
/// <returns>Returns an instance of <see cref="GlobalExclusiveDeviceAccess"/> if exclusive access has been granted.
/// Returns <c>null</c> if exclusive access cannot be obtained within <paramref name="millisecondsTimeout"/>,
/// or if <paramref name="cancellationToken"/> was cancelled.</returns>
public static GlobalExclusiveDeviceAccess TryGet(
NanoDeviceBase device,
int millisecondsTimeout = Timeout.Infinite,
CancellationToken? cancellationToken = null)
{
return GetOrCreate(
device.ConnectionPort.InstanceId,
millisecondsTimeout,
cancellationToken);
}

/// <summary>
/// Communicate with a serial device and ensure the code to be executed as exclusive access to the device.
/// Get exclusive access to a device connected to the specified port, to communicate with that device.
/// </summary>
/// <param name="port">The port the device is connected to.</param>
/// <param name="millisecondsTimeout">Maximum time in milliseconds to wait for exclusive access</param>
/// <param name="cancellationToken">Cancellation token that can be cancelled to stop/abort waiting for the exclusive access.</param>
/// <returns>Returns an instance of <see cref="GlobalExclusiveDeviceAccess"/> if exclusive access has been granted.
/// Returns <c>null</c> if exclusive access cannot be obtained within <paramref name="millisecondsTimeout"/>,
/// or if <paramref name="cancellationToken"/> was cancelled.</returns>
public static GlobalExclusiveDeviceAccess TryGet(
IPort port,
int millisecondsTimeout = Timeout.Infinite,
CancellationToken? cancellationToken = null)
{
return GetOrCreate(
port.InstanceId,
millisecondsTimeout,
cancellationToken);
}

/// <summary>
/// Get exclusive access to a device connected to the specified serial port, to communicate with that device.
/// </summary>
/// <param name="serialPort">The serial port the device is connected to.</param>
/// <param name="communication">Code to execute while having exclusive access to the device</param>
/// <param name="millisecondsTimeout">Maximum time in milliseconds to wait for exclusive access</param>
/// <param name="cancellationToken">Cancellation token that can be cancelled to stop/abort running the <paramref name="communication"/>.
/// This method does not stop/abort execution of <paramref name="communication"/> after it has been started.</param>
/// <returns>Indicates whether the <paramref name="communication"/> has been executed. Returns <c>false</c> if exclusive access
/// cannot be obtained within <paramref name="millisecondsTimeout"/>, or if <paramref name="cancellationToken"/> was cancelled
/// before the <paramref name="communication"/> has been started.</returns>
public static bool CommunicateWithDevice(string serialPort, Action communication, int millisecondsTimeout = Timeout.Infinite, CancellationToken? cancellationToken = null)
/// <param name="cancellationToken">Cancellation token that can be cancelled to stop/abort waiting for the exclusive access.</param>
/// <returns>Returns an instance of <see cref="GlobalExclusiveDeviceAccess"/> if exclusive access has been granted.
/// Returns <c>null</c> if exclusive access cannot be obtained within <paramref name="millisecondsTimeout"/>,
/// or if <paramref name="cancellationToken"/> was cancelled.</returns>
public static GlobalExclusiveDeviceAccess TryGet(
string serialPort,
int millisecondsTimeout = Timeout.Infinite,
CancellationToken? cancellationToken = null)
{
return DoCommunicateWithDevice(serialPort, communication, millisecondsTimeout, cancellationToken);
return GetOrCreate(
serialPort,
millisecondsTimeout,
cancellationToken);
}

/// <summary>
/// Communicate with a device accessible via the network and ensure the code to be executed as exclusive access to the device.
/// Get exclusive access to a device at the specified network address, to communicate with that device.
/// </summary>
/// <param name="address">The network address the device is connected to.</param>
/// <param name="communication">Code to execute while having exclusive access to the device</param>
/// <param name="millisecondsTimeout">Maximum time in milliseconds to wait for exclusive access</param>
/// <param name="cancellationToken">Cancellation token that can be cancelled to stop/abort running the <paramref name="communication"/>.
/// This method does not stop/abort execution of <paramref name="communication"/> after it has been started.</param>
/// <returns>Indicates whether the <paramref name="communication"/> has been executed. Returns <c>false</c> if exclusive access
/// cannot be obtained within <paramref name="millisecondsTimeout"/>, or if <paramref name="cancellationToken"/> was cancelled
/// before the <paramref name="communication"/> has been started.</returns>
public static bool CommunicateWithDevice(NetworkDeviceInformation address, Action communication, int millisecondsTimeout = Timeout.Infinite, CancellationToken? cancellationToken = null)
/// <param name="cancellationToken">Cancellation token that can be cancelled to stop/abort waiting for the exclusive access.</param>
/// <returns>Returns an instance of <see cref="GlobalExclusiveDeviceAccess"/> if exclusive access has been granted.
/// Returns <c>null</c> if exclusive access cannot be obtained within <paramref name="millisecondsTimeout"/>,
/// or if <paramref name="cancellationToken"/> was cancelled.</returns>
public static GlobalExclusiveDeviceAccess TryGet(
NetworkDeviceInformation address,
int millisecondsTimeout = Timeout.Infinite,
CancellationToken? cancellationToken = null)
{
return DoCommunicateWithDevice($"{address.Host}:{address.Port}", communication, millisecondsTimeout, cancellationToken);
return GetOrCreate(
$"{address.Host}:{address.Port}",
millisecondsTimeout,
cancellationToken);
}

#endregion

#region Implementation
private static bool DoCommunicateWithDevice(string connectionKey, Action communication, int millisecondsTimeout, CancellationToken? cancellationToken)

private static GlobalExclusiveDeviceAccess GetOrCreate(
string portInstanceId,
int millisecondsTimeout,
CancellationToken? cancellationToken)
{
for (bool retry = true; retry;)
if (cancellationToken?.IsCancellationRequested == true)
{
retry = false;
return null;
}

var waitHandles = new List<WaitHandle>();
var mutex = new Mutex(false, $"{MutexBaseName}{connectionKey}");
waitHandles.Add(mutex);
// If the access lock has been created earlier (in previous statements) and has not yet been disposed,
// use that lock.
GlobalExclusiveDeviceAccess result = null;

CancellationTokenSource timeOutToken = null;
if (millisecondsTimeout > 0 && millisecondsTimeout != Timeout.Infinite)
lock (s_locks)
{
if (s_locks.TryGetValue(
portInstanceId,
out (AsyncLocal<GlobalExclusiveDeviceAccess>, Semaphore) instance))
{
timeOutToken = new CancellationTokenSource(millisecondsTimeout);
waitHandles.Add(timeOutToken.Token.WaitHandle);
// Note that the result can still be null, in case the exclusive access was obtained by
// code that is not a statement previously executed in the context of the current async-thread.
result = instance.Item1?.Value;
}
}

if (cancellationToken.HasValue)
if (result is not null)
{
// Use the same lock as in Dispose
lock (result._mutex)
{
waitHandles.Add(cancellationToken.Value.WaitHandle);
if (result._lockCount == 0)
{
// It should already have been disposed of. Fix that for good measure.
lock (s_locks)
{
s_locks.Remove(portInstanceId);
}

result = null;
}
else
{
result._lockCount++;
return result;
}
}
}

CancellationTokenSource timeOutToken = null;

if (millisecondsTimeout != Timeout.Infinite)
{
timeOutToken = new CancellationTokenSource(millisecondsTimeout);
}

try
try
{
while (result is null)
{
if (WaitHandle.WaitAny(waitHandles.ToArray()) == 0)
// Cannot use Mutex as the Mutex must be released on the same thread as it is created.
// That may not be the case if this is used in async code.
var mutex = new Semaphore(
0,
1,
$"{MutexBaseName}{portInstanceId}",
out bool createdNew);

if (createdNew)
{
communication();
return true;
// This code has created the semaphore, so it has exclusive access
result = new GlobalExclusiveDeviceAccess(portInstanceId, mutex);
}
else
{
// Wait for the semaphore created elsewhere
var waitHandles = new List<WaitHandle>()
{
// Mutex must be added first
mutex
};

// The problem with a semaphore it that, while waiting, it does not detect if the application
// with exclusive access is aborted without releasing the semaphore. The semaphore
// has to be re-created for that. If the other application just releases the semaphore,
// the wait is ended.
var iterationToken = new CancellationTokenSource(1000);
waitHandles.Add(iterationToken.Token.WaitHandle);

// Add the other tokens as well
if (timeOutToken is not null)
{
waitHandles.Add(timeOutToken.Token.WaitHandle);
}

if (cancellationToken.HasValue)
{
waitHandles.Add(cancellationToken.Value.WaitHandle);
}

try
{
// Try to get exclusive access to the device.
int handleIndex = WaitHandle.WaitAny([.. waitHandles]);

if (handleIndex == 0)
{
// Exclusive access granted as the wait ended because of the mutex
result = new GlobalExclusiveDeviceAccess(portInstanceId, mutex);
}
else if (handleIndex != 1)
{
// timeOutToken or cancellationToken are cancelled
break;
}
}
finally
{
iterationToken.Dispose();

if (result is null)
{
mutex.Dispose();
}
}
}
}
catch (AbandonedMutexException)
}
finally
{
timeOutToken?.Dispose();
}

return result;
}

private GlobalExclusiveDeviceAccess(
string portInstanceId,
Semaphore mutex)
{
_mutex = mutex;
_lockCount = 1;
_portInstanceId = portInstanceId;

var instance = new AsyncLocal<GlobalExclusiveDeviceAccess>
{
Value = this
};

lock (s_locks)
{
s_locks[portInstanceId] = (instance, mutex);
}
}

/// <summary>
/// Dispose of the exclusive access.
/// </summary>
public void Dispose()
{
bool removeFromLocks = false;

lock (_mutex)
{
_lockCount--;

if (_lockCount == 0)
{
// While this process is waiting on a mutex, the process that owned the mutex has been terminated
// without properly releasing the mutex.
// Try again, if this is the only remaining process it will re-create the mutex and get exclusive access.
retry = true;
_mutex.Release();
_mutex.Dispose();
removeFromLocks = true;
}
finally
}

if (removeFromLocks)
{
lock (s_locks)
{
mutex.ReleaseMutex();
timeOutToken?.Dispose();
s_locks.Remove(_portInstanceId);
}
}

return false;
}

#endregion
}
}
Loading

0 comments on commit d84256c

Please sign in to comment.