Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Exception handling #970

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 74 additions & 4 deletions src/OSPSuite.Core/Domain/Services/ConcurrencyManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,30 @@

namespace OSPSuite.Core.Domain.Services
{
public class ConcurrencyManagerResult<TResult> where TResult : class
{
public string Id { get; }
public bool Succeeded { get; }
public string ErrorMessage { get; }
public TResult Result { get; }

public ConcurrencyManagerResult(string id, TResult result)
{
Id = id;
Succeeded = true;
ErrorMessage = "";
Result = result;
}

public ConcurrencyManagerResult(string id, string errorMessage)
{
Id = id;
Succeeded = false;
ErrorMessage = errorMessage;
Result = null;
}
}

public interface IConcurrencyManager
{
/// <summary>
Expand All @@ -18,20 +42,34 @@ public interface IConcurrencyManager
/// <param name="data">List of data to consume by the workers</param>
/// <param name="action">A function to run on each worker on each piece of data</param>
/// <returns>Dictionary binding a result for each input data after running the action on it</returns>
Task<IReadOnlyDictionary<TData, TResult>> RunAsync<TData, TResult>(int numberOfCoresToUse, CancellationToken cancellationToken, IReadOnlyList<TData> data, Func<int, CancellationToken, TData, Task<TResult>> action);
Task<IReadOnlyDictionary<TData, ConcurrencyManagerResult<TResult>>> RunAsync<TData, TResult>
(
int numberOfCoresToUse,
CancellationToken cancellationToken,
IReadOnlyList<TData> data,
Func<TData, string> id,
Func<int, CancellationToken, TData, Task<TResult>> action
) where TResult : class;
}

public class ConcurrencyManager : IConcurrencyManager
{
private readonly int _maximumNumberOfCoresToUse = Math.Max(1, Environment.ProcessorCount - 1);
public async Task<IReadOnlyDictionary<TData, TResult>> RunAsync<TData, TResult>(int numberOfCoresToUse, CancellationToken cancellationToken, IReadOnlyList<TData> data, Func<int, CancellationToken, TData, Task<TResult>> action)
public async Task<IReadOnlyDictionary<TData, ConcurrencyManagerResult<TResult>>> RunAsync<TData, TResult>
(
int numberOfCoresToUse,
CancellationToken cancellationToken,
IReadOnlyList<TData> data,
Func<TData, string> id,
Func<int, CancellationToken, TData, Task<TResult>> action
) where TResult : class
{
if (numberOfCoresToUse <= 0)
numberOfCoresToUse = _maximumNumberOfCoresToUse;
var concurrentData = new ConcurrentQueue<TData>(data);
numberOfCoresToUse = Math.Min(numberOfCoresToUse, concurrentData.Count);

var results = new ConcurrentDictionary<TData, TResult>();
var results = new ConcurrentDictionary<TData, ConcurrencyManagerResult<TResult>>();
//Starts one task per core
var tasks = Enumerable.Range(0, numberOfCoresToUse).Select(async coreIndex =>
{
Expand All @@ -41,7 +79,13 @@ public async Task<IReadOnlyDictionary<TData, TResult>> RunAsync<TData, TResult>(
cancellationToken.ThrowIfCancellationRequested();

//Invoke the action on it and store the result
var result = await action.Invoke(coreIndex, cancellationToken, datum);
var result = await returnWithExceptionHandling(
coreIndex,
cancellationToken,
action,
datum,
id
);
results.TryAdd(datum, result);
}
}).ToList();
Expand All @@ -52,5 +96,31 @@ public async Task<IReadOnlyDictionary<TData, TResult>> RunAsync<TData, TResult>(
var tt = results.Values;
return results;
}

private async Task<ConcurrencyManagerResult<TResult>> returnWithExceptionHandling<TData, TResult>
(
int coreId,
CancellationToken cancellationToken,
Func<int, CancellationToken, TData, Task<TResult>> task,
TData input, Func<TData, string> id
) where TResult : class
{
try
{
return new ConcurrencyManagerResult<TResult>
(
id(input),
result: await task.Invoke(coreId, cancellationToken, input)
);
}
catch (Exception e)
{
return new ConcurrencyManagerResult<TResult>
(
id(input),
errorMessage: e.Message
);
}
}
}
}
62 changes: 30 additions & 32 deletions src/OSPSuite.R/Services/ConcurrentSimulationRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,26 @@ namespace OSPSuite.R.Services
{
public class SettingsForConcurrentRunSimulationBatch
{
public IModelCoreSimulation Simulation { get; set; }
public IModelCoreSimulation Simulation { get; }
public List<SimulationBatchRunValues> SimulationBatchRunValues { get; } = new List<SimulationBatchRunValues>();
public SimulationBatchOptions SimulationBatchOptions { get; set; }
public SimulationBatchOptions SimulationBatchOptions { get; }
private ConcurrentQueue<SimulationBatch> _simulationBatches = new ConcurrentQueue<SimulationBatch>();
internal int MissingBatchesCount { get => Math.Max(0, SimulationBatchRunValues.Count - _simulationBatches.Count); }
internal bool AddNewBatch() {
_simulationBatches.Enqueue(Api.GetSimulationBatchFactory().Create(Simulation, SimulationBatchOptions));
return true;

public SettingsForConcurrentRunSimulationBatch(IModelCoreSimulation simulation, SimulationBatchOptions simulationBatchOptions)
{
Simulation = simulation;
SimulationBatchOptions = simulationBatchOptions;
}

internal SimulationBatch AddNewBatch() {
var batch = Api.GetSimulationBatchFactory().Create(Simulation, SimulationBatchOptions);
_simulationBatches.Enqueue(batch);
return batch;
}

public IEnumerable<SimulationBatch> SimulationBatches { get => _simulationBatches; }

public string AddSimulationBatchRunValues(SimulationBatchRunValues simulationBatchRunValues)
{
var id = Guid.NewGuid().ToString();
Expand All @@ -43,17 +53,6 @@ class SimulationBatchRunOptions
public SimulationBatchOptions SimulationBatchOptions { get; set; }
}

public class ConcurrentSimulationResults
{
public ConcurrentSimulationResults(string id, SimulationResults results)
{
Id = id;
SimulationResults = results;
}
public string Id { get; }
public SimulationResults SimulationResults { get; }
}

public interface IConcurrentSimulationRunner : IDisposable
{
/// <summary>
Expand Down Expand Up @@ -81,13 +80,13 @@ public interface IConcurrentSimulationRunner : IDisposable
/// Runs all preset settings concurrently
/// </summary>
/// <returns></returns>
ConcurrentSimulationResults[] RunConcurrently();
ConcurrencyManagerResult<SimulationResults>[] RunConcurrently();

/// <summary>
/// After initialization phase, run all simulations or simulationBatches Async
/// </summary>
/// <returns></returns>
Task<IEnumerable<ConcurrentSimulationResults>> RunConcurrentlyAsync();
Task<IEnumerable<ConcurrencyManagerResult<SimulationResults>>> RunConcurrentlyAsync();
}

public class ConcurrentSimulationRunner : IConcurrentSimulationRunner
Expand Down Expand Up @@ -140,11 +139,12 @@ private Task initializeBatches()
(
settings => Enumerable.Range(0, settings.MissingBatchesCount).Select(_ => settings)
).ToList(),
data => new Guid().ToString(),
(core, ct, settings) => Task.FromResult(settings.AddNewBatch())
);
}

public async Task<IEnumerable<ConcurrentSimulationResults>> RunConcurrentlyAsync()
public async Task<IEnumerable<ConcurrencyManagerResult<SimulationResults>>> RunConcurrentlyAsync()
{
//Currently we only allow for running simulations or simulation batches, but not both
if (_listOfSettingsForConcurrentRunSimulationBatch.Count > 0 && _simulations.Count > 0)
Expand All @@ -157,6 +157,7 @@ public async Task<IEnumerable<ConcurrentSimulationResults>> RunConcurrentlyAsync
numberOfCores(),
_cancellationTokenSource.Token,
_simulations,
simulation => simulation.Id,
runSimulation
);
return results.Values;
Expand All @@ -176,6 +177,7 @@ public async Task<IEnumerable<ConcurrentSimulationResults>> RunConcurrentlyAsync
SimulationBatchOptions = sb.SimulationBatchOptions,
SimulationBatchRunValues = rv
})).ToList(),
sb => sb.SimulationBatchRunValues.Id,
runSimulationBatch
);

Expand All @@ -185,27 +187,23 @@ public async Task<IEnumerable<ConcurrentSimulationResults>> RunConcurrentlyAsync
return results.Values;
}

return Enumerable.Empty<ConcurrentSimulationResults>();
return Enumerable.Empty<ConcurrencyManagerResult<SimulationResults>>();
}


public ConcurrentSimulationResults[] RunConcurrently() => RunConcurrentlyAsync().Result.ToArray();

private async Task<ConcurrentSimulationResults> runSimulation(int coreIndex, CancellationToken cancellationToken, IModelCoreSimulation simulation)
public ConcurrencyManagerResult<SimulationResults>[] RunConcurrently() => RunConcurrentlyAsync().Result.ToArray();



private async Task<SimulationResults> runSimulation(int coreIndex, CancellationToken cancellationToken, IModelCoreSimulation simulation)
{
//We want a new instance every time that's why we are not injecting SimulationRunner in constructor
return new ConcurrentSimulationResults(
simulation.Id,
await Api.GetSimulationRunner().RunAsync(new SimulationRunArgs { Simulation = simulation, SimulationRunOptions = SimulationRunOptions })
);
return await Api.GetSimulationRunner().RunAsync(new SimulationRunArgs { Simulation = simulation, SimulationRunOptions = SimulationRunOptions });
}

private async Task<ConcurrentSimulationResults> runSimulationBatch(int coreIndex, CancellationToken cancellationToken, SimulationBatchRunOptions simulationBatchWithOptions)
private async Task<SimulationResults> runSimulationBatch(int coreIndex, CancellationToken cancellationToken, SimulationBatchRunOptions simulationBatchWithOptions)
{
return new ConcurrentSimulationResults(
simulationBatchWithOptions.SimulationBatchRunValues.Id,
await simulationBatchWithOptions.SimulationBatch.RunAsync(simulationBatchWithOptions.SimulationBatchRunValues)
);
return await simulationBatchWithOptions.SimulationBatch.RunAsync(simulationBatchWithOptions.SimulationBatchRunValues);
}

#region Disposable properties
Expand Down
38 changes: 20 additions & 18 deletions tests/OSPSuite.R.Tests/Services/ConcurrentSimulationRunnerSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
using OSPSuite.BDDHelper.Extensions;
using OSPSuite.Core;
using OSPSuite.Core.Domain;
using OSPSuite.Core.Domain.Data;
using OSPSuite.Core.Domain.Services;
using OSPSuite.Core.Extensions;
using OSPSuite.R.Domain;
using System.Collections.Generic;
Expand All @@ -20,7 +22,7 @@ class CoreUserSettings : ICoreUserSettings
public class When_running_simulations_concurrently : ContextForIntegration<IConcurrentSimulationRunner>
{
private ISimulationPersister _simulationPersister;
private ConcurrentSimulationResults[] _results;
private ConcurrencyManagerResult<SimulationResults>[] _results;

protected override void Context()
{
Expand All @@ -43,15 +45,15 @@ protected override void Because()
public void should_run_the_simulations()
{
Assert.IsNotNull(_results);
Assert.IsTrue(_results.All(r => r.SimulationResults.ElementAt(0).AllValues.SelectMany(v => v.Values).Count() > 0));
Assert.IsTrue(_results.All(r => r.Result.ElementAt(0).AllValues.SelectMany(v => v.Values).Count() > 0));
}
}

public class When_running_a_batch_simulation_run_concurrently : ContextForIntegration<IConcurrentSimulationRunner>
{
private SettingsForConcurrentRunSimulationBatch _simulationWithBatchOptions;
private ISimulationPersister _simulationPersister;
private ConcurrentSimulationResults[] _results;
private ConcurrencyManagerResult<SimulationResults>[] _results;
private IModelCoreSimulation _simulation;
private List<string> _ids = new List<string>();
private List<SimulationBatchRunValues> _simulationBatchRunValues = new List<SimulationBatchRunValues>();
Expand All @@ -62,23 +64,23 @@ public override void GlobalContext()
_simulationPersister = Api.GetSimulationPersister();
_simulation = _simulationPersister.LoadSimulation(HelperForSpecs.DataFile("S1.pkml"));

_simulationWithBatchOptions = new SettingsForConcurrentRunSimulationBatch()
{
Simulation = _simulation,
SimulationBatchOptions = new SimulationBatchOptions
{
_simulationWithBatchOptions = new SettingsForConcurrentRunSimulationBatch
(
_simulation,
new SimulationBatchOptions
{
VariableMolecules = new[]
{
new[] {"Organism", "Kidney", "Intracellular", "Caffeine"}.ToPathString()
},
{
new[] {"Organism", "Kidney", "Intracellular", "Caffeine"}.ToPathString()
},

VariableParameters = new[]
{
new[] {"Organism", "Liver", "Volume"}.ToPathString(),
new[] {"Organism", "Hematocrit"}.ToPathString(),
}
{
new[] {"Organism", "Liver", "Volume"}.ToPathString(),
new[] {"Organism", "Hematocrit"}.ToPathString(),
}
}
};
);

sut = Api.GetConcurrentSimulationRunner();
sut.AddSimulationBatchOption(_simulationWithBatchOptions);
Expand Down Expand Up @@ -115,8 +117,8 @@ public void should_be_able_to_simulate_the_simulation_for_multiple_runes()
{
var result = Api.GetSimulationBatchFactory().Create(_simulation, _simulationWithBatchOptions.SimulationBatchOptions).Run(_simulationBatchRunValues.FirstOrDefault(v => v.Id == id));
var concurrentResult = _results.FirstOrDefault(r => r.Id == id);
result.Time.Values.ShouldBeEqualTo(concurrentResult.SimulationResults.Time.Values);
result.ResultsFor(0).ValuesAsArray().Select(qv => qv.Values).ShouldBeEqualTo(concurrentResult.SimulationResults.ResultsFor(0).ValuesAsArray().Select(qv => qv.Values));
result.Time.Values.ShouldBeEqualTo(concurrentResult.Result.Time.Values);
result.ResultsFor(0).ValuesAsArray().Select(qv => qv.Values).ShouldBeEqualTo(concurrentResult.Result.ResultsFor(0).ValuesAsArray().Select(qv => qv.Values));
}
}
}
Expand Down