Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent simulation runner #946

Merged
merged 26 commits into from
Apr 19, 2021
Merged

Conversation

abdelr
Copy link
Member

@abdelr abdelr commented Apr 8, 2021

No description provided.

@abdelr abdelr requested a review from msevestre April 8, 2021 08:15
@abdelr abdelr self-assigned this Apr 8, 2021
@abdelr abdelr linked an issue Apr 8, 2021 that may be closed by this pull request
@Yuri05
Copy link
Member

Yuri05 commented Apr 8, 2021

@abdelr @PavelBal Can you give an example R code how this "concurrent" simulation run will be used in R?

@PavelBal
Copy link
Member

PavelBal commented Apr 8, 2021

@Yuri05 here just an example how it could look like (this code is for the older version and the signatures of the methods are different, but the idea from @examples should be clear)

#' @title  Runs a set of simulations (only individual simulations) and returns a list of \code{SimulationResults}
#'
#' @param simulations A list of \code{Simulation} objects to simulate.
#' @param simulationRunOptions Optional instance of a \code{SimulationRunOptions} used during the simulation run
#'
#' @return A list of \code{SimulationResults} objects in the order corresponding to the order of simulations.
#'
#' @examples
#' simPath <- system.file("extdata", "simple.pkml", package = "ospsuite")
#' sim <- loadSimulation(simPath)
#' sim2 <- loadSimulation(simPath)
#' sim3 <- loadSimulation(simPath)
#' results <- runSimulationsConcurrently(list(sim, sim2, sim3))
#' @export
runSimulationsConcurrently <- function(simulations, simulationRunOptions = NULL) {
  validateIsOfType(simulations, Simulation)
  validateIsOfType(simulationRunOptions, SimulationRunOptions, nullAllowed = TRUE)
  simulationRunner <- getNetTask("SimulationRunner")

  simulations <- c(simulations)

  #Create SimulationRunnerConcurrentOptions and add all simulations
  runConcurrentOptions <- SimulationRunnerConcurrentOptions$new()
  for (i in seq_along(simulations)){
    runConcurrentOptions$addSimulation(simulations[[i]])
  }

  results <- rClr::clrCall(simulationRunner, "RunConcurrently", runConcurrentOptions$ref)

  simResultsList <- vector("list", length(simulations))
  for (i in seq_along(simulations)){
    simResultsList[[i]] <- SimulationResults$new(results[[i]], simulations[[i]])
  }
  return(simResultsList)
}

@Yuri05
Copy link
Member

Yuri05 commented Apr 8, 2021

And should all simulations be executed only once or multiple times with different parameter sets?
Because in the latter case the current implementation would be very inefficient.

}

public IReadOnlyList<string> VariableParameterPaths { get; private set; }

public IReadOnlyList<string> VariableMoleculePaths { get; private set; }

public object Clone()
{
var xml = _simModelExporter.ExportSimModelXml(_modelCoreSimulation, new SimulationRunOptions().SimModelExportMode);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow.. what's going on here?😳
What are you trying to achieve? If feels like you are not working at the right abstraction level here

Copy link
Member

@msevestre msevestre left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is looking very good. A few questions

while (data.TryDequeue(out var datum))
{
//Invoke the action on it and store the result
var result = await action.Invoke(coreIndex, cancellationToken, datum);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the cancellationToken needs to be checked here I believe. It is the responsibility of the caller to throw if the action has been cancelled

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right... adding the fix now

await Task.WhenAll(tasks);
//all tasks are completed. Can return results

return results.ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not return results? Just curious

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we return a Task you could await. Again, from R you cannot await so the ConcurrentSimulationRunner does not use this feature but from other parts of the code you could have async/await

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not what I mean.results is already a dictionary. Why are your returning another dictionary based of this? The async/await is created for you automatically because you are using await /async in your code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To the best of my knowledge, ConcurrentDictionary and Dictionary are different classes. They do not share a common interface or anything like that. We need a ConcurrentDictionary to avoid concurrent access issues but beyond ConcurrencyManager class, we do not need it anymore so we should turn it into a Dictionary. Similarly, we use a ConcurrentQueue. I have just updated the signature so it accepts a IEnumerable instead of a ConcurrentQueue since this data structure is only needed as a robust implementation but should not be affecting the signature of the method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To the best of my knowledge, ConcurrentDictionary and Dictionary are different classes. They do not share a common interface or anything like that.

oh yeah they share IDictionary or IReadOnlyDictionary.

IReadOnlyDictionary is what we should return in that case anyways so that the caller does not mutate (at least not without explicit casting) the results

image


public SimulationResults[] RunConcurrently()
{
var results = _concurrentManager.RunAsync<IModelCoreSimulation, SimulationResults>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should not we create the instance of ConcurrentMAnager here? why is it injected?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why creating a new object every time. Also, with injection you decouple the implementations. Whoever is using the ConcurrentSimulationRunner could decide on the implementation of the ConcurrencyManager to use by just injecting it. Why should we create the object ourselves in the code and break the inversion of control?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not say NEW. I say ensuring that we create a. new instance here so that there is no risk of memory leaks. Typically using a factory method to retrieve a new instance in a Dispose construct. That way, if something goes wrong, we clear after ourselves.

So we do not create the object ourselves, we do not break inversion of control. We just ensure that we deal with memory leaks intrinsically by not holding references to objects

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Sorry, I did not understood before. I will add the changes for this.

{
var results = _concurrentManager.RunAsync<IModelCoreSimulation, SimulationResults>(
NumberOfCores,
new System.Threading.CancellationToken(false),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does the false do?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cconcurrentSimulationRunner is never canceling the run since it will be used from R. I am not sure if this should be a feature (canceling the call). I am not sure either on how to do it from R in case it is possible. If this is something we should include, please tell me and I will include it. Still, the ConcurrencyManager does support canceling since it could be used from somewhere else.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My question is: What does the false do? :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initializes the state of the token as not cancelled.

if (_simulationBatches.Count > 0)
return null;

return Enumerable.Empty<SimulationResults>().ToArray();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Array.Empty :)


private async Task<SimulationResults> runSimulation(int coreIndex, CancellationToken cancellationToken, IModelCoreSimulation simulation)
{
return await Api.GetSimulationRunner().RunAsync(new SimulationRunArgs() { Simulation = simulation, SimulationRunOptions = SimulationRunOptions });
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

np need to Async away. Just return the Task

//Currently we only allow for running simulations or simulation batches, but not both
if (_simulationBatches.Count > 0 && _simulations.Count > 0)
//Temporal Exception. We should allow for mixing both use cases but we need to discuss first
throw new Exception("You already have Simulation and SimulationBatch objects and should not mix, please invoke Clear to start adding objects from a fresh start");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never throw Exception when coming from us. Use OSPSuiteException (unless you intend to see the whole call stack etc... Here this is clearly more for the user to see

Also error in Error files?

SimulationResults[] RunConcurrently();
}

public class ConcurrentSimulationRunner : IConcurrentSimulationRunner
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would make this guy umplement IDisposable and ensure that Dispose called Clear if not done already

This is the snippet we use


      private bool _disposed;

      public void Dispose()
      {
         if (_disposed) return;

         Cleanup();
         GC.SuppressFinalize(this);
         _disposed = true;
      }

      ~ConcurrentSimulationRunner ()
      {
         Cleanup();
      }

      protected virtual void Cleanup()
      {
      }

var results = _concurrentManager.RunAsync(
NumberOfCores,
new CancellationToken(false),
new List<IModelCoreSimulation>(_simulations),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you pass a New List?

@msevestre msevestre merged commit 7bbe115 into develop Apr 19, 2021
@msevestre msevestre deleted the Concurrent_Simulation_Runner branch April 19, 2021 16:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

It should be possible to run simulations concurrently from R
4 participants