Skip to content

Commit

Permalink
feat: implement worker server
Browse files Browse the repository at this point in the history
  • Loading branch information
hataehyeok committed Nov 23, 2023
1 parent d49d266 commit ff302f9
Showing 1 changed file with 75 additions and 13 deletions.
88 changes: 75 additions & 13 deletions rpc/src/main/scala/WorkerServer.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package kr.ac.postech.paranode.rpc

import com.google.protobuf.ByteString
import io.grpc.Server
import io.grpc.ServerBuilder
import kr.ac.postech.paranode.core._

import java.util.logging.Logger
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.reflect.io.Path

import worker._

Expand Down Expand Up @@ -61,8 +65,20 @@ class WorkerServer(executionContext: ExecutionContext) { self =>
val promise = Promise[SampleReply]

Future {
// TODO: Logic
promise.success(new SampleReply())
try {
val sortedBlock = Block.fromPath(Path("data/block"), 10, 90).sort()
val sampledKeys = sortedBlock
.sample()
.map(key => ByteString.copyFrom(key.underlying))
.toList
val reply = SampleReply(sampledKeys)

promise.success(reply)
} catch {
case e: Exception =>
println(e)
promise.failure(e)
}
}(executionContext)

promise.future
Expand All @@ -74,30 +90,76 @@ class WorkerServer(executionContext: ExecutionContext) { self =>
val promise = Promise[PartitionReply]

Future {
// TODO: Logic
promise.success(new PartitionReply())
try {
val block = Block.fromPath(Path("data/block"), 10, 90)
request.workers
.map(workerMetadata => {
val keyRange = KeyRange(
Key.fromString(workerMetadata.keyRange.get.from.toStringUtf8),
Key.fromString(workerMetadata.keyRange.get.to.toStringUtf8)
)
val partition = block.partition(keyRange)
val partitionPath = Path(
s"data/partition/${workerMetadata.node.get.host}:${workerMetadata.node.get.port}"
)
partition._2.writeTo(partitionPath)
})

promise.success(new PartitionReply())
} catch {
case e: Exception =>
println(e)
promise.failure(e)
}
}(executionContext)

promise.future
}

override def exchange(request: ExchangeRequest): Future[ExchangeReply] = {
val promise = Promise[ExchangeReply]

Future {
// TODO: Logic
promise.success(new ExchangeReply())
}(executionContext)
val futures = request.workers.map(workerMetadata =>
Future {
val host = workerMetadata.node.get.host
val port = workerMetadata.node.get.port
val partitionPath = Path(s"data/partition/${host}:${port}")

try {
if (partitionPath.exists) {
val partition = Block.fromPath(partitionPath, 10, 90)
val exchangeClient = ExchangeClient.apply(host, port)
val reply = exchangeClient.saveRecords(partition.records)
Some(reply)
} else {
None
}
} finally {
if (partitionPath.exists) {
partitionPath.delete()
}
}
}(executionContext)
)

promise.future
Future.sequence(futures).map(_ => new ExchangeReply())
}

override def merge(request: MergeRequest): Future[MergeReply] = {
val promise = Promise[MergeReply]

Future {
// TODO: Logic
promise.success(new MergeReply())
try {
val host = Path("data/host")
val port = Path("data/port")
val blockPath = Path(s"data/partition/${host}:${port}")
val mergedBlock = Block.fromPath(blockPath, 10, 90).sort()
mergedBlock.writeTo(blockPath)

promise.success(new MergeReply())
} catch {
case e: Exception =>
println(e)
promise.failure(e)
}
}(executionContext)

promise.future
Expand Down

0 comments on commit ff302f9

Please sign in to comment.