diff --git a/core/amber/build.sbt b/core/amber/build.sbt index 2ad77bffba..255483a41b 100644 --- a/core/amber/build.sbt +++ b/core/amber/build.sbt @@ -218,7 +218,8 @@ libraryDependencies += "org.jooq" % "jooq" % "3.14.16" // https://mvnrepository.com/artifact/org.jgrapht/jgrapht-core libraryDependencies += "org.jgrapht" % "jgrapht-core" % "1.4.0" -libraryDependencies += "io.altoo" %% "akka-kryo-serialization" % "2.5.0" +// https://mvnrepository.com/artifact/io.altoo/akka-kryo-serialization +libraryDependencies += "io.altoo" %% "akka-kryo-serialization" % "2.5.2" // https://mvnrepository.com/artifact/com.twitter/util-core libraryDependencies += "com.twitter" %% "util-core" % "22.12.0" diff --git a/core/amber/src/main/resources/cluster.conf b/core/amber/src/main/resources/cluster.conf index 1fbe3f681e..23719d3755 100644 --- a/core/amber/src/main/resources/cluster.conf +++ b/core/amber/src/main/resources/cluster.conf @@ -58,3 +58,5 @@ akka { } } } + +akka-kryo-serialization.kryo-initializer = "edu.uci.ics.amber.engine.common.AmberKryoInitializer" \ No newline at end of file diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberKryoInitializer.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberKryoInitializer.scala new file mode 100644 index 0000000000..f269cdbe92 --- /dev/null +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberKryoInitializer.scala @@ -0,0 +1,16 @@ +package edu.uci.ics.amber.engine.common + +import akka.actor.ExtendedActorSystem +import com.esotericsoftware.kryo.serializers.ClosureSerializer +import com.esotericsoftware.kryo.serializers.ClosureSerializer.Closure +import io.altoo.akka.serialization.kryo.DefaultKryoInitializer +import io.altoo.akka.serialization.kryo.serializer.scala.ScalaKryo + +import java.lang.invoke.SerializedLambda + +class AmberKryoInitializer extends DefaultKryoInitializer { + override def preInit(kryo: ScalaKryo, system: ExtendedActorSystem): Unit = { + kryo.register(classOf[SerializedLambda]) + kryo.register(classOf[Closure], new ClosureSerializer()) + } +} diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/faulttolerance/LoggingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/faulttolerance/LoggingSpec.scala index 6178209766..211e58736f 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/faulttolerance/LoggingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/faulttolerance/LoggingSpec.scala @@ -4,6 +4,13 @@ import akka.actor.ActorSystem import akka.serialization.SerializationExtension import akka.testkit.{ImplicitSender, TestKit} import edu.uci.ics.amber.core.tuple.{AttributeType, Schema, TupleLike} +import edu.uci.ics.amber.core.virtualidentity.{ + ActorVirtualIdentity, + ChannelIdentity, + OperatorIdentity, + PhysicalOpIdentity +} +import edu.uci.ics.amber.core.workflow.{PhysicalLink, PortIdentity} import edu.uci.ics.amber.engine.architecture.logreplay.{ReplayLogManager, ReplayLogRecord} import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.{ AddPartitioningRequest, @@ -18,6 +25,7 @@ import edu.uci.ics.amber.engine.architecture.rpc.workerservice.WorkerServiceGrpc METHOD_START_WORKER } import edu.uci.ics.amber.engine.architecture.sendsemantics.partitionings.OneToOnePartitioning +import edu.uci.ics.amber.engine.common.AmberRuntime import edu.uci.ics.amber.engine.common.ambermessage.{ DataFrame, WorkflowFIFOMessage, @@ -26,16 +34,11 @@ import edu.uci.ics.amber.engine.common.ambermessage.{ import edu.uci.ics.amber.engine.common.rpc.AsyncRPCClient.ControlInvocation import edu.uci.ics.amber.engine.common.storage.SequentialRecordStorage import edu.uci.ics.amber.engine.common.virtualidentity.util.{CONTROLLER, SELF} -import edu.uci.ics.amber.core.virtualidentity.{ - ActorVirtualIdentity, - ChannelIdentity, - OperatorIdentity, - PhysicalOpIdentity -} -import edu.uci.ics.amber.core.workflow.{PhysicalLink, PortIdentity} -import edu.uci.ics.amber.engine.common.AmberRuntime import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.TimeLimitedTests import org.scalatest.flatspec.AnyFlatSpecLike +import org.scalatest.time.Span +import org.scalatest.time.SpanSugar.convertIntToGrainOfTime import java.net.URI @@ -43,7 +46,8 @@ class LoggingSpec extends TestKit(ActorSystem("LoggingSpec", AmberRuntime.akkaConfig)) with ImplicitSender with AnyFlatSpecLike - with BeforeAndAfterAll { + with BeforeAndAfterAll + with TimeLimitedTests { override def beforeAll(): Unit = { AmberRuntime.serde = SerializationExtension(system) @@ -110,6 +114,7 @@ class LoggingSpec ) "determinant logger" should "log processing steps in local storage" in { + Thread.sleep(1000) // wait for serializer to be registered val logStorage = SequentialRecordStorage.getStorage[ReplayLogRecord]( Some(new URI("ram:///recovery-logs/tmp")) ) @@ -129,4 +134,5 @@ class LoggingSpec assert(logRecords.length == 15) } + override def timeLimit: Span = 30.seconds }