Skip to content

Commit

Permalink
Merge branch 'master' into jiadong-add-file-service
Browse files Browse the repository at this point in the history
  • Loading branch information
aglinxinyuan authored Mar 5, 2025
2 parents cbf81b2 + 2049734 commit 97916dc
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 10 deletions.
3 changes: 2 additions & 1 deletion core/amber/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ libraryDependencies += "org.jooq" % "jooq" % "3.14.16"
// https://mvnrepository.com/artifact/org.jgrapht/jgrapht-core
libraryDependencies += "org.jgrapht" % "jgrapht-core" % "1.4.0"

libraryDependencies += "io.altoo" %% "akka-kryo-serialization" % "2.5.0"
// https://mvnrepository.com/artifact/io.altoo/akka-kryo-serialization
libraryDependencies += "io.altoo" %% "akka-kryo-serialization" % "2.5.2"

// https://mvnrepository.com/artifact/com.twitter/util-core
libraryDependencies += "com.twitter" %% "util-core" % "22.12.0"
Expand Down
2 changes: 2 additions & 0 deletions core/amber/src/main/resources/cluster.conf
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,5 @@ akka {
}
}
}

akka-kryo-serialization.kryo-initializer = "edu.uci.ics.amber.engine.common.AmberKryoInitializer"
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package edu.uci.ics.amber.engine.common

import akka.actor.ExtendedActorSystem
import com.esotericsoftware.kryo.serializers.ClosureSerializer
import com.esotericsoftware.kryo.serializers.ClosureSerializer.Closure
import io.altoo.akka.serialization.kryo.DefaultKryoInitializer
import io.altoo.akka.serialization.kryo.serializer.scala.ScalaKryo

import java.lang.invoke.SerializedLambda

class AmberKryoInitializer extends DefaultKryoInitializer {
override def preInit(kryo: ScalaKryo, system: ExtendedActorSystem): Unit = {
kryo.register(classOf[SerializedLambda])
kryo.register(classOf[Closure], new ClosureSerializer())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ import akka.actor.ActorSystem
import akka.serialization.SerializationExtension
import akka.testkit.{ImplicitSender, TestKit}
import edu.uci.ics.amber.core.tuple.{AttributeType, Schema, TupleLike}
import edu.uci.ics.amber.core.virtualidentity.{
ActorVirtualIdentity,
ChannelIdentity,
OperatorIdentity,
PhysicalOpIdentity
}
import edu.uci.ics.amber.core.workflow.{PhysicalLink, PortIdentity}
import edu.uci.ics.amber.engine.architecture.logreplay.{ReplayLogManager, ReplayLogRecord}
import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.{
AddPartitioningRequest,
Expand All @@ -18,6 +25,7 @@ import edu.uci.ics.amber.engine.architecture.rpc.workerservice.WorkerServiceGrpc
METHOD_START_WORKER
}
import edu.uci.ics.amber.engine.architecture.sendsemantics.partitionings.OneToOnePartitioning
import edu.uci.ics.amber.engine.common.AmberRuntime
import edu.uci.ics.amber.engine.common.ambermessage.{
DataFrame,
WorkflowFIFOMessage,
Expand All @@ -26,24 +34,20 @@ import edu.uci.ics.amber.engine.common.ambermessage.{
import edu.uci.ics.amber.engine.common.rpc.AsyncRPCClient.ControlInvocation
import edu.uci.ics.amber.engine.common.storage.SequentialRecordStorage
import edu.uci.ics.amber.engine.common.virtualidentity.util.{CONTROLLER, SELF}
import edu.uci.ics.amber.core.virtualidentity.{
ActorVirtualIdentity,
ChannelIdentity,
OperatorIdentity,
PhysicalOpIdentity
}
import edu.uci.ics.amber.core.workflow.{PhysicalLink, PortIdentity}
import edu.uci.ics.amber.engine.common.AmberRuntime
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.TimeLimitedTests
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.time.Span
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime

import java.net.URI

class LoggingSpec
extends TestKit(ActorSystem("LoggingSpec", AmberRuntime.akkaConfig))
with ImplicitSender
with AnyFlatSpecLike
with BeforeAndAfterAll {
with BeforeAndAfterAll
with TimeLimitedTests {

override def beforeAll(): Unit = {
AmberRuntime.serde = SerializationExtension(system)
Expand Down Expand Up @@ -110,6 +114,7 @@ class LoggingSpec
)

"determinant logger" should "log processing steps in local storage" in {
Thread.sleep(1000) // wait for serializer to be registered
val logStorage = SequentialRecordStorage.getStorage[ReplayLogRecord](
Some(new URI("ram:///recovery-logs/tmp"))
)
Expand All @@ -129,4 +134,5 @@ class LoggingSpec
assert(logRecords.length == 15)
}

override def timeLimit: Span = 30.seconds
}

0 comments on commit 97916dc

Please sign in to comment.