Skip to content

Commit 6c5fa63

Browse files
authored
allow deactivating recipes (#1848)
* adds deactivate button to the dashboard to improve the performance of the initial load * adds allActive method to RecipeManager so that only active recipes can be fetched from the possible storage backend
1 parent 7507cc4 commit 6c5fa63

File tree

23 files changed

+286
-199
lines changed

23 files changed

+286
-199
lines changed

core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/AkkaBaker.scala

+6-10
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,7 @@ class AkkaBaker private[runtime](config: AkkaBakerConfig) extends scaladsl.Baker
8585

8686
config.bakerActorProvider.initialize(system)
8787

88-
private val recipeManager: RecipeManager =
89-
config.recipeManager
88+
private val recipeManager: RecipeManager = config.recipeManager
9089

9190
private val processIndexActor: ActorRef =
9291
config.bakerActorProvider.createProcessIndexActor(config.interactions, recipeManager, system.settings.config)
@@ -106,7 +105,7 @@ class AkkaBaker private[runtime](config: AkkaBakerConfig) extends scaladsl.Baker
106105
val updated = recipeRecord.updated
107106
if (!recipeRecord.validate || config.bakerValidationSettings.allowAddingRecipeWithoutRequiringInstances) {
108107
logger.debug(s"Recipe implementation errors are ignored for ${recipe.name}:${recipe.recipeId}")
109-
addToManager(recipe, updated)
108+
recipeManager.put(recipeRecord.copy(updated = updated))
110109
} else {
111110
logger.debug(s"Recipe ${recipe.name}:${recipe.recipeId} is validated for compatibility with interactions")
112111
getImplementationErrors(recipe).flatMap { implementationErrors =>
@@ -115,16 +114,13 @@ class AkkaBaker private[runtime](config: AkkaBakerConfig) extends scaladsl.Baker
115114
} else if (recipe.validationErrors.nonEmpty) {
116115
Future.failed(RecipeValidationException(s"Recipe ${recipe.name}:${recipe.recipeId} has validation errors: ${recipe.validationErrors.mkString(", ")}"))
117116
} else {
118-
addToManager(recipe, updated)
117+
recipeManager.put(recipeRecord.copy(updated = updated))
119118
}
120119
}
121120
}
122121
}
123122
} yield result
124123

125-
private def addToManager(compiledRecipe: CompiledRecipe, timeCreated: Long): Future[String] =
126-
recipeManager.put(RecipeRecord.of(compiledRecipe, updated = timeCreated))
127-
128124
private def getImplementationErrors(compiledRecipe: CompiledRecipe): Future[Set[String]] = {
129125
compiledRecipe.interactionTransitions.toList
130126
.traverse(x => config.interactions.incompatibilities(x).map((_, x.originalInteractionName)))
@@ -141,7 +137,7 @@ class AkkaBaker private[runtime](config: AkkaBakerConfig) extends scaladsl.Baker
141137
* @return
142138
*/
143139
override def getRecipe(recipeId: String): Future[RecipeInformation] = {
144-
// here we ask the RecipeManager actor to return us the recipe for the given id
140+
// here we ask the RecipeManager to return us the recipe for the given id
145141
recipeManager.get(recipeId).flatMap {
146142
case Some(r: RecipeRecord) =>
147143
getImplementationErrors(r.recipe).map(errors => RecipeInformation(r.recipe, r.updated, errors, r.validate, r.recipe.sensoryEvents ))
@@ -155,12 +151,12 @@ class AkkaBaker private[runtime](config: AkkaBakerConfig) extends scaladsl.Baker
155151
getRecipe(recipeId).map(r => RecipeVisualizer.visualizeRecipe(r.compiledRecipe, style))
156152

157153
/**
158-
* Returns all recipes added to this baker instance.
154+
* Returns all 'active' recipes added to this baker instance.
159155
*
160156
* @return All recipes in the form of map of recipeId -> CompiledRecipe
161157
*/
162158
override def getAllRecipes: Future[Map[String, RecipeInformation]] = {
163-
recipeManager.all
159+
recipeManager.allActive
164160
.flatMap(
165161
_.toList
166162
.traverse(ri => getImplementationErrors(ri.recipe)

core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_index/ProcessIndex.scala

+26-27
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,6 @@ class ProcessIndex(recipeInstanceIdleTimeout: Option[FiniteDuration],
147147
private val restartRandomFactor: Double = config.getDouble("baker.process-instance.restart-randomFactor")
148148

149149
private val index: mutable.Map[String, ActorMetadata] = mutable.Map[String, ActorMetadata]()
150-
private val recipeCache: mutable.Map[String, (CompiledRecipe, Long)] = mutable.Map[String, (CompiledRecipe, Long)]()
151150

152151
//TODO chose if to use the CassandraBakerCleanup or the ActorBasedBakerCleanup
153152
private val cleanup: BakerCleanup = {
@@ -168,41 +167,37 @@ class ProcessIndex(recipeInstanceIdleTimeout: Option[FiniteDuration],
168167
}
169168

170169
// TODO this is a synchronous ask on an actor which createProcessActor is considered bad practice, alternative?
171-
private def getRecipeIdFromManager(recipeId: String): Option[(CompiledRecipe, Long)] = {
172-
log.debug("Adding recipe to recipe cache: {}", recipeId)
173-
val futureResult: Future[Option[RecipeRecord]] = recipeManager.get(recipeId)
174-
val result = Await.result(futureResult, updateCacheTimeout)
175-
recipeCache ++= result.map(r => r.recipeId -> (r.recipe, r.updated))
176-
log.debug("Added recipe to recipe cache: {}, current size: {}", recipeId, recipeCache.size)
177-
result.map(r => r.recipe -> r.updated)
170+
private def getRecipeRecord(recipeId: String, reactivate: Boolean): Option[RecipeRecord] = {
171+
val eventualRecord = recipeManager.get(recipeId).flatMap {
172+
case Some(recipeRecord) if recipeRecord.isActive => Future.successful(Some(recipeRecord))
173+
case Some(recipeRecord) if !reactivate => Future.successful(Some(recipeRecord)) // inactive recipe, but reactivation is not needed
174+
case Some(recipeRecord) if reactivate => // inactive recipe, reactivate it
175+
log.info(s"Inactive recipe $recipeId being reactivated.")
176+
val activeRecipe = recipeRecord.copy(isActive = true)
177+
recipeManager.put(activeRecipe)
178+
Future.successful(Some(activeRecipe))
179+
case None => Future.successful(None)
180+
}
181+
Await.result(eventualRecord, updateCacheTimeout)
178182
}
179183

180184
def getRecipeIdFromActor(actorRef: ActorRef) : String = actorRef.path.name
181185

182-
def getRecipeWithTimeStamp(recipeId: String): Option[(CompiledRecipe, Long)] =
183-
recipeCache.get(recipeId) match {
184-
case None =>
185-
getRecipeIdFromManager(recipeId)
186-
case other => other
187-
}
188-
189-
def getCompiledRecipe(recipeId: String): Option[CompiledRecipe] =
190-
getRecipeWithTimeStamp(recipeId).fold[Option[CompiledRecipe]] {
186+
private def getCompiledRecipe(recipeId: String, reactivate: Boolean = true): Option[CompiledRecipe] = {
187+
getRecipeRecord(recipeId, reactivate).fold[Option[CompiledRecipe]] {
191188
log.warning(s"No recipe found for $recipeId")
192189
None
193190
} {
194-
case (recipe, _) => Some(recipe)
191+
case r:RecipeRecord => Some(r.recipe)
195192
case _ => None
196193
}
197-
198-
def getOrCreateProcessActor(recipeInstanceId: String): Option[ActorRef] =
199-
context.child(recipeInstanceId).orElse(createProcessActor(recipeInstanceId))
194+
}
200195

201196
def getProcessActor(recipeInstanceId: String): Option[ActorRef] =
202197
context.child(recipeInstanceId)
203198

204-
def createProcessActor(recipeInstanceId: String): Option[ActorRef] =
205-
getCompiledRecipe(index(recipeInstanceId).recipeId).map(createProcessActor(recipeInstanceId, _))
199+
private def createProcessActor(recipeInstanceId: String, reactivateRecipe: Boolean = true): Option[ActorRef] =
200+
getCompiledRecipe(index(recipeInstanceId).recipeId, reactivateRecipe).map(createProcessActor(recipeInstanceId, _))
206201

207202
// creates a ProcessInstanceActor, does not do any validation
208203
def createProcessActor(recipeInstanceId: String, compiledRecipe: CompiledRecipe): ActorRef = {
@@ -238,7 +233,7 @@ class ProcessIndex(recipeInstanceIdleTimeout: Option[FiniteDuration],
238233

239234
def shouldDelete(meta: ActorMetadata): Boolean = {
240235
if(meta.processStatus != Deleted)
241-
getCompiledRecipe(meta.recipeId) match {
236+
getCompiledRecipe(meta.recipeId, reactivate = false) match {
242237
case Some(recipe) =>
243238
recipe.retentionPeriod.exists { p => meta.createdDateTime + p.toMillis < System.currentTimeMillis() }
244239
case None =>
@@ -257,7 +252,7 @@ class ProcessIndex(recipeInstanceIdleTimeout: Option[FiniteDuration],
257252
actorRef ! Stop(delete = true)
258253
case None =>
259254
log.debug(s"Deleting ${meta.recipeInstanceId} via cleanup tool")
260-
getCompiledRecipe(meta.recipeId) match {
255+
getCompiledRecipe(meta.recipeId, reactivate = false) match {
261256
case Some(compiledRecipe) =>
262257
val persistenceId = ProcessInstance.recipeInstanceId2PersistenceId(compiledRecipe.name, meta.recipeInstanceId)
263258
log.debug(s"Deleting with persistenceId: ${persistenceId}")
@@ -284,6 +279,10 @@ class ProcessIndex(recipeInstanceIdleTimeout: Option[FiniteDuration],
284279
}
285280
}
286281

282+
// This util function is used only for delete process functionality, therefore passing reactivateRecipe=false to avoid reactivating the recipe
283+
private def getOrCreateProcessActor(recipeInstanceId: String): Option[ActorRef] =
284+
context.child(recipeInstanceId).orElse(createProcessActor(recipeInstanceId, reactivateRecipe = false))
285+
287286
private def forgetProcesses(): Unit = {
288287
rememberProcessDuration.map {
289288
duration: Duration =>
@@ -520,8 +519,8 @@ class ProcessIndex(recipeInstanceIdleTimeout: Option[FiniteDuration],
520519
index.get(recipeInstanceId) match {
521520
case Some(processMetadata) if processMetadata.isDeleted => sender() ! ProcessDeleted(recipeInstanceId)
522521
case Some(processMetadata) =>
523-
getRecipeWithTimeStamp(processMetadata.recipeId) match {
524-
case Some((compiledRecipe, timestamp)) => sender() ! RecipeFound(compiledRecipe, timestamp)
522+
getRecipeRecord(processMetadata.recipeId, reactivate = true) match {
523+
case Some(RecipeRecord(_, _, updated, recipe, _, _)) => sender() ! RecipeFound(recipe, updated)
525524
case None => sender() ! NoSuchProcess(recipeInstanceId)
526525
}
527526
case None => sender() ! NoSuchProcess(recipeInstanceId)

core/akka-runtime/src/main/scala/com/ing/baker/runtime/recipe_manager/ActorBasedRecipeManager.scala

+32-7
Original file line numberDiff line numberDiff line change
@@ -13,27 +13,52 @@ import com.ing.baker.runtime.common.RecipeRecord
1313
import com.typesafe.config.Config
1414
import net.ceedubs.ficus.Ficus._
1515

16+
import scala.collection.concurrent.TrieMap
1617
import scala.concurrent.{ExecutionContext, Future}
1718

1819
private class ActorBasedRecipeManager(actor: ActorRef, timeouts: Timeouts)(implicit val ex: ExecutionContext) extends RecipeManager {
1920

21+
/**
22+
* A cache to store the recipes that have been retrieved from the actor system.
23+
* Needed to avoid querying the actor too often.
24+
*/
25+
private val cache: TrieMap[String, RecipeRecord] = TrieMap.empty
26+
2027
override def put(recipeRecord: RecipeRecord): Future[String] = {
2128
implicit val timeout: Timeout = timeouts.defaultAddRecipeTimeout
22-
(actor ? AddRecipe(recipeRecord.recipe)).mapTo[AddRecipeResponse].map(_.recipeId)
29+
(actor ? AddRecipe(recipeRecord.recipe)).mapTo[AddRecipeResponse].map { response =>
30+
cache += ((response.recipeId, recipeRecord))
31+
response.recipeId
32+
}
2333
}
2434

2535
override def get(recipeId: String): Future[Option[RecipeRecord]] = {
26-
implicit val timeout: Timeout = timeouts.defaultInquireTimeout
27-
(actor ? GetRecipe(recipeId)).map {
28-
case RecipeFound(compiledRecipe, timestamp) => Some(RecipeRecord.of(compiledRecipe, updated = timestamp))
29-
case NoRecipeFound(_) => None
36+
cache.get(recipeId) match {
37+
case Some(recipeRecord) => Future.successful(Some(recipeRecord))
38+
case None =>
39+
implicit val timeout: Timeout = timeouts.defaultInquireTimeout
40+
(actor ? GetRecipe(recipeId)).map {
41+
case RecipeFound(compiledRecipe, timestamp) =>
42+
val recipeRecord = RecipeRecord.of(compiledRecipe, updated = timestamp)
43+
cache += ((recipeId, recipeRecord))
44+
Some(recipeRecord)
45+
case NoRecipeFound(_) => None
46+
}
3047
}
3148
}
3249

3350
override def all: Future[Seq[RecipeRecord]] = {
34-
implicit val timeout: Timeout = timeouts.defaultInquireTimeout
35-
(actor ? GetAllRecipes).mapTo[AllRecipes].map(_.recipes.map { r => RecipeRecord.of(r.compiledRecipe, updated = r.timestamp) })
51+
implicit val timeout: Timeout = timeouts.defaultInquireTimeout
52+
(actor ? GetAllRecipes).mapTo[AllRecipes].map { response =>
53+
val recipes = response.recipes.map { r => RecipeRecord.of(r.compiledRecipe, updated = r.timestamp) }
54+
recipes.foreach(recipe => cache += ((recipe.recipeId, recipe)))
55+
recipes
56+
}
3657
}
58+
59+
// active/inactive recipes are not supported yet in the actor based recipe manager,
60+
// therefore we are returning all recipes here
61+
override def allActive: Future[Seq[RecipeRecord]] = all
3762
}
3863

3964
object ActorBasedRecipeManager {

core/akka-runtime/src/main/scala/com/ing/baker/runtime/recipe_manager/DefaultRecipeManager.scala

+5-2
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,8 @@ class DefaultRecipeManager(implicit val ex: ExecutionContext) extends RecipeMana
99

1010
val state:TrieMap[String, RecipeRecord] = TrieMap.empty
1111

12-
1312
override def put(recipeRecord: RecipeRecord): Future[String] = {
14-
state. += ((recipeRecord.recipeId,recipeRecord))
13+
state += ((recipeRecord.recipeId,recipeRecord))
1514
Future.successful(recipeRecord.recipeId)
1615
}
1716

@@ -22,6 +21,10 @@ class DefaultRecipeManager(implicit val ex: ExecutionContext) extends RecipeMana
2221
override def all: Future[Seq[RecipeRecord]] = {
2322
Future.successful(state.values.toSeq)
2423
}
24+
25+
override def allActive: Future[Seq[RecipeRecord]] = {
26+
Future.successful(state.values.filter(_.isActive).toSeq)
27+
}
2528
}
2629

2730
object DefaultRecipeManager {

core/akka-runtime/src/main/scala/com/ing/baker/runtime/recipe_manager/PollingAware.scala

+3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ import com.typesafe.scalalogging.LazyLogging
55

66
import scala.concurrent.{ExecutionContext, Future}
77

8+
/**
9+
* This trait is used to avoid replacing recipes with elder or same creation time.
10+
*/
811
trait PollingAware extends RecipeManager with LazyLogging {
912

1013
implicit def ex: ExecutionContext

core/akka-runtime/src/main/scala/com/ing/baker/runtime/recipe_manager/RecipeManager.scala

+2
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,6 @@ trait RecipeManager {
1010
def get(recipeId: String): Future[Option[RecipeRecord]]
1111

1212
def all: Future[Seq[RecipeRecord]]
13+
14+
def allActive: Future[Seq[RecipeRecord]]
1315
}

core/akka-runtime/src/test/java/com/ing/baker/BakerTest.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void shouldSetupJBakerWithDefaultActorFramework() throws BakerException,
6161

6262
String recipeInstanceId = UUID.randomUUID().toString();
6363
Baker jBaker = AkkaBaker.java(config, actorSystem, implementationsList);
64-
java.util.Map<String, Value> ingredients = jBaker.addRecipe(RecipeRecord.of(compiledRecipe, System.currentTimeMillis(), false))
64+
java.util.Map<String, Value> ingredients = jBaker.addRecipe(RecipeRecord.of(compiledRecipe, System.currentTimeMillis(), false, true))
6565
.thenCompose(recipeId -> {
6666
assertEquals(compiledRecipe.getValidationErrors().size(), 0);
6767
return jBaker.bake(recipeId, recipeInstanceId);
@@ -84,7 +84,7 @@ public void shouldSetupJBakerWithGivenActorFramework() throws BakerException, Ex
8484
assertEquals(compiledRecipe.getValidationErrors().size(), 0);
8585

8686
Baker jBaker = AkkaBaker.java(config, actorSystem, implementationsList);
87-
String recipeId = jBaker.addRecipe(RecipeRecord.of(compiledRecipe, System.currentTimeMillis(), false)).get();
87+
String recipeId = jBaker.addRecipe(RecipeRecord.of(compiledRecipe, System.currentTimeMillis(), false, true)).get();
8888

8989
String requestId = UUID.randomUUID().toString();
9090
jBaker.bake(recipeId, requestId).get();
@@ -104,7 +104,7 @@ public void shouldFailWhenMissingImplementations() throws BakerException, Execut
104104
CompiledRecipe compiledRecipe = RecipeCompiler.compileRecipe(JavaCompiledRecipeTest.setupComplexRecipe());
105105
Baker jBaker = AkkaBaker.java(config, actorSystem);
106106

107-
jBaker.addRecipe(RecipeRecord.of(compiledRecipe, System.currentTimeMillis(), true)).get();
107+
jBaker.addRecipe(RecipeRecord.of(compiledRecipe, System.currentTimeMillis(), true, true)).get();
108108
}
109109

110110
@Test
@@ -119,7 +119,7 @@ public void shouldExecuteCompleteFlow() throws BakerException, ExecutionExceptio
119119

120120
// Setup recipe
121121
CompiledRecipe compiledRecipe = RecipeCompiler.compileRecipe(JavaCompiledRecipeTest.setupComplexRecipe());
122-
String recipeId = jBaker.addRecipe(RecipeRecord.of(compiledRecipe, System.currentTimeMillis(), false)).get();
122+
String recipeId = jBaker.addRecipe(RecipeRecord.of(compiledRecipe, System.currentTimeMillis(), false, true)).get();
123123
EventInstance eventOne = EventInstance.from(new JavaCompiledRecipeTest.EventOne());
124124
assertEquals(eventOne.getName(), "EventOne");
125125
assertTrue(eventOne.getProvidedIngredients().isEmpty());

core/akka-runtime/src/test/java/com/ing/baker/Webshop.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ public void testWebshop() throws ExecutionException, InterruptedException {
155155
ActorSystem system = ActorSystem.create("webshop");
156156
Baker baker = AkkaBaker.java(config, system, ImmutableList.of(shipGoodsMock, sendInvoiceMock, manufactureGoodsMock, validateOrderMock));
157157

158-
String recipeId = baker.addRecipe(RecipeRecord.of(recipe, System.currentTimeMillis(), false)).get();
158+
String recipeId = baker.addRecipe(RecipeRecord.of(recipe, System.currentTimeMillis(), false, true)).get();
159159

160160
String recipeInstanceId = "56a70f82-a24d-497f-b3ac-57366adbb39c"; //UUID.randomUUID().toString();
161161

0 commit comments

Comments
 (0)