Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent simulation runner #946

Merged
merged 26 commits into from
Apr 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
78c4569
Adding support for parallel execution of Simulations and simulationBa…
abdelr Feb 12, 2021
2a52a3d
Renaming to RunConcurrently
abdelr Feb 15, 2021
4a0910b
SimulationRunner can also run populations
abdelr Feb 22, 2021
542b3e5
Update SimulationRunner.cs
abdelr Feb 22, 2021
a016b3d
turn collections into object[] for compatibility with rclr
abdelr Mar 15, 2021
6f1d416
Update SimulationBatchSpecs.cs
abdelr Mar 15, 2021
ce7482a
Update SimulationRunner.cs
abdelr Mar 15, 2021
b969f2f
Defining a settings class for running simulations concurrently
abdelr Mar 16, 2021
ed6faca
Update SimulationRunner.cs
abdelr Mar 16, 2021
3641618
Renamed settings to options
PavelBal Mar 18, 2021
df9662d
Adding failing tests to recreate Pavel's problem
abdelr Mar 31, 2021
0c5b0f7
Remove Population
abdelr Apr 7, 2021
3c6beda
Fix: each thread runs with a different SimModelManager so they do not…
abdelr Apr 7, 2021
804de61
Fixing rebase
abdelr Apr 8, 2021
acd19e4
SimulationBatch also accepts an unique argument
abdelr Apr 8, 2021
2193ff1
Make inner list readonly and add Add method for populating
abdelr Apr 8, 2021
1b5306a
typo fix
abdelr Apr 8, 2021
39c3de1
Adding ConcurrencyManager and ConcurrentSimulationRunner as a wrapper…
abdelr Apr 16, 2021
a1c5480
Quick fixes
abdelr Apr 16, 2021
236bad2
checking cancelation token before starting a new simulation
abdelr Apr 17, 2021
fbddd1f
Removing clones and some misc fixes
abdelr Apr 19, 2021
2213af2
Some missing cleaning up
abdelr Apr 19, 2021
c7d663b
Some updates based on Code REview with @abdelr
msevestre Apr 19, 2021
71bc753
Proper use of cancellation token
abdelr Apr 19, 2021
02bc8fa
Remove task duplication
abdelr Apr 19, 2021
03d1cf9
Error text from constant
abdelr Apr 19, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/OSPSuite.Assets/UIConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1212,6 +1212,7 @@ public static class Error
public static readonly string NaNOnData = "Data contains NaN values at imported columns. Select a different action for NaN values or clean your data.";
public static readonly string UnsupportedFileFormat = "The file format is not supported";
public static readonly string InconsistenMoleculeAndMoleWeightException = "Molecule and Molecular Weight do not match. Please either edit your Molecule or your Molecular Weight in your project or remove the Molecular Weight from your mappings";
public static readonly string InvalidMixOfSimulationAndSimulationBatch = "You already have Simulation and SimulationBatch objects and should not mix, please invoke Clear to start adding objects from a fresh start";

public static string LinkedParameterIsNotValidInIdentificationParameter(string identificationParameterName) => $"At least one linked parameter is invalid in identification paramter '{identificationParameterName}'";

Expand Down
61 changes: 61 additions & 0 deletions src/OSPSuite.Core/Domain/Services/ConcurrencyManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace OSPSuite.Core.Domain.Services
{
public interface IConcurrencyManager
{
/// <summary>
/// </summary>
/// <typeparam name="TData">Data type to consume by the worker function</typeparam>
/// <typeparam name="TResult">Data produced by the worker function</typeparam>
/// <param name="numberOfCoresToUse">Number of cores to use. Use 0 or negative to take all cores</param>
/// <param name="cancellationToken">Cancellation token to cancel the threads</param>
/// <param name="data">List of data to consume by the workers</param>
/// <param name="action">A function to run on each worker on each piece of data</param>
/// <returns>Dictionary binding a result for each input data after running the action on it</returns>
Task<IReadOnlyDictionary<TData, TResult>> RunAsync<TData, TResult>(int numberOfCoresToUse, CancellationToken cancellationToken, IReadOnlyList<TData> data, Func<int, CancellationToken, TData, Task<TResult>> action);
}

public class ConcurrencyManager : IConcurrencyManager
{
private readonly ICoreUserSettings _coreUserSettings;

public ConcurrencyManager(ICoreUserSettings coreUserSettings)
{
_coreUserSettings = coreUserSettings;
}

public async Task<IReadOnlyDictionary<TData, TResult>> RunAsync<TData, TResult>(int numberOfCoresToUse, CancellationToken cancellationToken, IReadOnlyList<TData> data, Func<int, CancellationToken, TData, Task<TResult>> action)
{
if (numberOfCoresToUse <= 0)
numberOfCoresToUse = _coreUserSettings.MaximumNumberOfCoresToUse;
var concurrentData = new ConcurrentQueue<TData>(data);
numberOfCoresToUse = Math.Min(numberOfCoresToUse, concurrentData.Count);

var results = new ConcurrentDictionary<TData, TResult>();
//Starts one task per core
var tasks = Enumerable.Range(0, numberOfCoresToUse).Select(async coreIndex =>
{
//While there is data left
while (concurrentData.TryDequeue(out var datum))
{
cancellationToken.ThrowIfCancellationRequested();

//Invoke the action on it and store the result
var result = await action.Invoke(coreIndex, cancellationToken, datum);
results.TryAdd(datum, result);
}
}).ToList();

await Task.WhenAll(tasks);
//all tasks are completed. Can return results

return results;
}
}
}
11 changes: 11 additions & 0 deletions src/OSPSuite.R/Domain/SimulationBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ public class SimulationBatchRunValues
public double[] MoleculeValues => InitialValues.ToNetArray(InitialValue);
}

public class SimulationBatchRunConcurrentlyOptions
{
private List<SimulationBatchRunValues> _simulationBatchRunValues = new List<SimulationBatchRunValues>();
public IReadOnlyList<SimulationBatchRunValues> SimulationBatchRunValues { get => _simulationBatchRunValues; }

public void AddSimulationBatchRunValues(SimulationBatchRunValues runValues)
{
_simulationBatchRunValues.Add(runValues);
}
}

public class SimulationBatch : IDisposable
{
private readonly ISimModelBatch _simModelBatch;
Expand Down
139 changes: 139 additions & 0 deletions src/OSPSuite.R/Services/ConcurrentSimulationRunner.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using OSPSuite.Assets;
using OSPSuite.Core.Domain;
using OSPSuite.Core.Domain.Data;
using OSPSuite.Core.Domain.Services;
using OSPSuite.R.Domain;
using OSPSuite.Utility.Exceptions;
using SimulationRunOptions = OSPSuite.R.Domain.SimulationRunOptions;

namespace OSPSuite.R.Services
{
public interface IConcurrentSimulationRunner : IDisposable
{
/// <summary>
/// General simulation options
/// </summary>
SimulationRunOptions SimulationRunOptions { get; set; }

/// <summary>
/// Number of cores to use concurrently. Use 0 or a negative value for using the maximum available.
/// </summary>
int NumberOfCores { get; set; }

/// <summary>
/// Adds a simulation to the list of Simulations
/// </summary>
void AddSimulation(IModelCoreSimulation simulation);

/// <summary>
/// Adds a SimulationBatch to the list of SimulationBatches
/// </summary>
/// <param name="simulationBatch"></param>
void AddSimulationBatch(SimulationBatch simulationBatch);

void Clear();

/// <summary>
/// Runs all preset settings concurrently
/// </summary>
/// <returns></returns>
SimulationResults[] RunConcurrently();
}

public class ConcurrentSimulationRunner : IConcurrentSimulationRunner
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would make this guy umplement IDisposable and ensure that Dispose called Clear if not done already

This is the snippet we use


      private bool _disposed;

      public void Dispose()
      {
         if (_disposed) return;

         Cleanup();
         GC.SuppressFinalize(this);
         _disposed = true;
      }

      ~ConcurrentSimulationRunner ()
      {
         Cleanup();
      }

      protected virtual void Cleanup()
      {
      }

{
private readonly IConcurrencyManager _concurrentManager;

public ConcurrentSimulationRunner(IConcurrencyManager concurrentManager)
{
_concurrentManager = concurrentManager;
}

public SimulationRunOptions SimulationRunOptions { get; set; }

public int NumberOfCores { get; set; }

private readonly List<IModelCoreSimulation> _simulations = new List<IModelCoreSimulation>();
private readonly List<SimulationBatch> _simulationBatches = new List<SimulationBatch>();
private CancellationTokenSource _cancellationTokenSource;

public void AddSimulation(IModelCoreSimulation simulation)
{
_simulations.Add(simulation);
}

public void AddSimulationBatch(SimulationBatch simulationBatch)
{
_simulationBatches.Add(simulationBatch);
}

public void Clear()
{
_simulations.Clear();
_simulationBatches.Clear();
if (_cancellationTokenSource != null)
_cancellationTokenSource.Cancel();
}

public SimulationResults[] RunConcurrently()
{
//Currently we only allow for running simulations or simulation batches, but not both
if (_simulationBatches.Count > 0 && _simulations.Count > 0)
//Temporal Exception. We should allow for mixing both use cases but we need to discuss first
throw new OSPSuiteException(Error.InvalidMixOfSimulationAndSimulationBatch);

_cancellationTokenSource = new CancellationTokenSource();
if (_simulations.Count > 0)
{
var results = _concurrentManager.RunAsync(
NumberOfCores,
_cancellationTokenSource.Token,
_simulations,
runSimulation
).Result;
return _simulations.Select(s => results[s]).ToArray();
}

if (_simulationBatches.Count > 0)
return null;

return Array.Empty<SimulationResults>();
}

private Task<SimulationResults> runSimulation(int coreIndex, CancellationToken cancellationToken, IModelCoreSimulation simulation)
{
//We want a new instance every time that's why we are not injecting SimulationRunner in constructor
return Api.GetSimulationRunner().RunAsync(new SimulationRunArgs {Simulation = simulation, SimulationRunOptions = SimulationRunOptions});
}

#region Disposable properties

private bool _disposed;

public void Dispose()
{
if (_disposed) return;

Cleanup();
GC.SuppressFinalize(this);
_disposed = true;
}

~ConcurrentSimulationRunner()
{
Cleanup();
}

protected virtual void Cleanup()
{
Clear();
}

#endregion
}
}
46 changes: 46 additions & 0 deletions tests/OSPSuite.R.Tests/Services/ConcurrentSimulationRunnerSpecs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using NUnit.Framework;
using OSPSuite.BDDHelper;
using OSPSuite.Core;
using OSPSuite.Core.Domain.Data;
using OSPSuite.Core.Domain.Services;
using System.Linq;

namespace OSPSuite.R.Services
{
class CoreUserSettings : ICoreUserSettings
{
public int MaximumNumberOfCoresToUse { get; set; } = 4;
public int NumberOfBins { get; set; }
public int NumberOfIndividualsPerBin { get; set; }
}

public class When_running_simulations_concurrently : ContextForIntegration<IConcurrentSimulationRunner>
{
private ISimulationPersister _simulationPersister;
private SimulationResults[] _results;

protected override void Context()
{
base.Context();

_simulationPersister = Api.GetSimulationPersister();
sut = new ConcurrentSimulationRunner(new ConcurrencyManager(new CoreUserSettings()));
sut.AddSimulation(_simulationPersister.LoadSimulation(HelperForSpecs.DataFile("S1.pkml")));
sut.AddSimulation(_simulationPersister.LoadSimulation(HelperForSpecs.DataFile("simple.pkml")));
sut.AddSimulation(_simulationPersister.LoadSimulation(HelperForSpecs.DataFile("simple.pkml")));
sut.AddSimulation(_simulationPersister.LoadSimulation(HelperForSpecs.DataFile("multiple_dosing.pkml")));
}

protected override void Because()
{
_results = sut.RunConcurrently();
}

[Observation]
public void should_run_the_simulations()
{
Assert.IsNotNull(_results);
Assert.IsTrue(_results.All(r => r.ElementAt(0).AllValues.SelectMany(v => v.Values).Count() > 0));
}
}
}