-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Concurrent simulation runner #946
Changes from 22 commits
78c4569
2a52a3d
4a0910b
542b3e5
a016b3d
6f1d416
ce7482a
b969f2f
ed6faca
3641618
df9662d
0c5b0f7
3c6beda
804de61
acd19e4
2193ff1
1b5306a
39c3de1
a1c5480
236bad2
fbddd1f
2213af2
c7d663b
71bc753
02bc8fa
03d1cf9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
using System; | ||
using System.Collections.Concurrent; | ||
using System.Collections.Generic; | ||
using System.Linq; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
|
||
namespace OSPSuite.Core.Domain.Services | ||
{ | ||
public interface IConcurrencyManager | ||
{ | ||
/// <summary> | ||
/// | ||
/// </summary> | ||
/// <typeparam name="TData">Data type to consume by the worker function</typeparam> | ||
/// <typeparam name="TResult">Data produced by the worker function</typeparam> | ||
/// <param name="numberOfCoresToUse">Number of cores to use. Use 0 or negative to take all cores</param> | ||
/// <param name="cancellationToken">Cancellation token to cancel the threads</param> | ||
/// <param name="data">List of data to consume by the workers</param> | ||
/// <param name="action">A function to run on each worker on each piece of data</param> | ||
/// <returns>Dictionary binding a result for each input data after running the action on it</returns> | ||
Task<Dictionary<TData, TResult>> RunAsync<TData, TResult>(int numberOfCoresToUse, CancellationToken cancellationToken, IEnumerable<TData> data, Func<int, CancellationToken, TData, Task<TResult>> action); | ||
} | ||
|
||
public class ConcurrencyManager : IConcurrencyManager | ||
{ | ||
private readonly ICoreUserSettings _coreUserSettings; | ||
|
||
public ConcurrencyManager(ICoreUserSettings coreUserSettings) | ||
{ | ||
_coreUserSettings = coreUserSettings; | ||
} | ||
|
||
public async Task<Dictionary<TData, TResult>> RunAsync<TData, TResult>(int numberOfCoresToUse, CancellationToken cancellationToken, IEnumerable<TData> data, Func<int, CancellationToken, TData, Task<TResult>> action) | ||
{ | ||
if (numberOfCoresToUse <= 0) | ||
numberOfCoresToUse = _coreUserSettings.MaximumNumberOfCoresToUse; | ||
var concurrentData = new ConcurrentQueue<TData>(data); | ||
numberOfCoresToUse = Math.Min(numberOfCoresToUse, concurrentData.Count); | ||
|
||
var results = new ConcurrentDictionary<TData, TResult>(); | ||
//Starts one task per core | ||
var tasks = Enumerable.Range(0, numberOfCoresToUse).Select(coreIndex => | ||
Task.Run(async () => | ||
{ | ||
//While there is data left | ||
while (concurrentData.TryDequeue(out var datum)) | ||
{ | ||
if (cancellationToken.IsCancellationRequested) return; | ||
|
||
//Invoke the action on it and store the result | ||
var result = await action.Invoke(coreIndex, cancellationToken, datum); | ||
results.TryAdd(datum, result); | ||
} | ||
}, cancellationToken) | ||
).ToList(); | ||
|
||
await Task.WhenAll(tasks); | ||
//all tasks are completed. Can return results | ||
|
||
return results.ToDictionary(kvp => kvp.Key, kvp => kvp.Value); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not return results? Just curious There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So we return a Task you could await. Again, from R you cannot await so the ConcurrentSimulationRunner does not use this feature but from other parts of the code you could have async/await There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not what I mean.results is already a dictionary. Why are your returning another dictionary based of this? The async/await is created for you automatically because you are using await /async in your code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To the best of my knowledge, ConcurrentDictionary and Dictionary are different classes. They do not share a common interface or anything like that. We need a ConcurrentDictionary to avoid concurrent access issues but beyond ConcurrencyManager class, we do not need it anymore so we should turn it into a Dictionary. Similarly, we use a ConcurrentQueue. I have just updated the signature so it accepts a IEnumerable instead of a ConcurrentQueue since this data structure is only needed as a robust implementation but should not be affecting the signature of the method. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
oh yeah they share IDictionary or IReadOnlyDictionary. IReadOnlyDictionary is what we should return in that case anyways so that the caller does not mutate (at least not without explicit casting) the results |
||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
using OSPSuite.Core.Domain; | ||
using OSPSuite.Core.Domain.Data; | ||
using OSPSuite.Core.Domain.Services; | ||
using OSPSuite.R.Domain; | ||
using System; | ||
using System.Collections.Concurrent; | ||
using System.Collections.Generic; | ||
using System.Linq; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using SimulationRunOptions = OSPSuite.R.Domain.SimulationRunOptions; | ||
|
||
namespace OSPSuite.R.Services | ||
{ | ||
public interface IConcurrentSimulationRunner | ||
{ | ||
/// <summary> | ||
/// General simulation options | ||
/// </summary> | ||
SimulationRunOptions SimulationRunOptions { get; set; } | ||
|
||
/// <summary> | ||
/// Number of cores to use concurrently. Use 0 or a negative value for using the maximum available. | ||
/// </summary> | ||
int NumberOfCores { get; set; } | ||
|
||
/// <summary> | ||
/// Adds a simulation to the list of Simulations | ||
/// </summary> | ||
void AddSimulation(IModelCoreSimulation simulation); | ||
msevestre marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
/// <summary> | ||
/// Adds a SimulationBatch to the list of SimulationBatches | ||
/// </summary> | ||
/// <param name="simulationBatch"></param> | ||
void AddSimulationBatch(SimulationBatch simulationBatch); | ||
|
||
void Clear(); | ||
|
||
/// <summary> | ||
/// Runs all preset settings concurrently | ||
/// </summary> | ||
/// <returns></returns> | ||
SimulationResults[] RunConcurrently(); | ||
} | ||
|
||
public class ConcurrentSimulationRunner : IConcurrentSimulationRunner | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would make this guy umplement IDisposable and ensure that Dispose called Clear if not done already This is the snippet we use
|
||
{ | ||
private IConcurrencyManager _concurrentManager; | ||
|
||
public ConcurrentSimulationRunner(IConcurrencyManager concurrentManager) | ||
{ | ||
_concurrentManager = concurrentManager; | ||
} | ||
|
||
public SimulationRunOptions SimulationRunOptions { get; set; } | ||
|
||
public int NumberOfCores { get; set; } | ||
|
||
private List<IModelCoreSimulation> _simulations = new List<IModelCoreSimulation>(); | ||
private List<SimulationBatch> _simulationBatches = new List<SimulationBatch>(); | ||
|
||
public void AddSimulation(IModelCoreSimulation simulation) | ||
{ | ||
_simulations.Add(simulation); | ||
} | ||
|
||
public void AddSimulationBatch(SimulationBatch simulationBatch) | ||
{ | ||
_simulationBatches.Add(simulationBatch); | ||
} | ||
|
||
public void Clear() | ||
{ | ||
_simulations.Clear(); | ||
_simulationBatches.Clear(); | ||
} | ||
|
||
public SimulationResults[] RunConcurrently() | ||
{ | ||
//Currently we only allow for running simulations or simulation batches, but not both | ||
if (_simulationBatches.Count > 0 && _simulations.Count > 0) | ||
//Temporal Exception. We should allow for mixing both use cases but we need to discuss first | ||
throw new Exception("You already have Simulation and SimulationBatch objects and should not mix, please invoke Clear to start adding objects from a fresh start"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Never throw Exception when coming from us. Use OSPSuiteException (unless you intend to see the whole call stack etc... Here this is clearly more for the user to see Also error in Error files? |
||
|
||
if (_simulations.Count > 0) | ||
msevestre marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
var results = _concurrentManager.RunAsync( | ||
NumberOfCores, | ||
new CancellationToken(false), | ||
new List<IModelCoreSimulation>(_simulations), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do you pass a New List? |
||
runSimulation | ||
).Result; | ||
return _simulations.Select(s => results[s]).ToArray(); | ||
} | ||
|
||
if (_simulationBatches.Count > 0) | ||
return null; | ||
|
||
return Enumerable.Empty<SimulationResults>().ToArray(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Array.Empty :) |
||
} | ||
|
||
private async Task<SimulationResults> runSimulation(int coreIndex, CancellationToken cancellationToken, IModelCoreSimulation simulation) | ||
{ | ||
return await Api.GetSimulationRunner().RunAsync(new SimulationRunArgs() { Simulation = simulation, SimulationRunOptions = SimulationRunOptions }); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. np need to Async away. Just return the Task |
||
} | ||
} | ||
|
||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
using NUnit.Framework; | ||
using OSPSuite.BDDHelper; | ||
using OSPSuite.Core; | ||
using OSPSuite.Core.Domain.Data; | ||
using OSPSuite.Core.Domain.Services; | ||
using System.Linq; | ||
|
||
namespace OSPSuite.R.Services | ||
{ | ||
class CoreUserSettings : ICoreUserSettings | ||
{ | ||
public int MaximumNumberOfCoresToUse { get; set; } = 4; | ||
public int NumberOfBins { get; set; } | ||
public int NumberOfIndividualsPerBin { get; set; } | ||
} | ||
|
||
public class When_running_simulations_concurrently : ContextForIntegration<IConcurrentSimulationRunner> | ||
{ | ||
private ISimulationPersister _simulationPersister; | ||
private SimulationResults[] _results; | ||
|
||
protected override void Context() | ||
{ | ||
base.Context(); | ||
|
||
_simulationPersister = Api.GetSimulationPersister(); | ||
sut = new ConcurrentSimulationRunner(new ConcurrencyManager(new CoreUserSettings())); | ||
sut.AddSimulation(_simulationPersister.LoadSimulation(HelperForSpecs.DataFile("S1.pkml"))); | ||
sut.AddSimulation(_simulationPersister.LoadSimulation(HelperForSpecs.DataFile("simple.pkml"))); | ||
sut.AddSimulation(_simulationPersister.LoadSimulation(HelperForSpecs.DataFile("simple.pkml"))); | ||
sut.AddSimulation(_simulationPersister.LoadSimulation(HelperForSpecs.DataFile("multiple_dosing.pkml"))); | ||
} | ||
|
||
protected override void Because() | ||
{ | ||
_results = sut.RunConcurrently(); | ||
} | ||
|
||
[Observation] | ||
public void should_run_the_simulations() | ||
{ | ||
Assert.IsNotNull(_results); | ||
Assert.IsTrue(_results.All(r => r.ElementAt(0).AllValues.SelectMany(v => v.Values).Count() > 0)); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the cancellationToken needs to be checked here I believe. It is the responsibility of the caller to throw if the action has been cancelled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right... adding the fix now