Skip to content

Commit

Permalink
fix: write using buffered output
Browse files Browse the repository at this point in the history
  • Loading branch information
betarixm committed Nov 27, 2023
1 parent 13484e9 commit e71cd12
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 2 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/Block.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class Block(val records: LazyList[Record]) extends AnyVal {

def writeTo(path: Path): File = {
val file = File(path)
val writer = file.bufferedWriter()
val writer = file.bufferedOutput()

try {
toChars.foreach(writer.write(_))
Expand All @@ -65,7 +65,7 @@ class Block(val records: LazyList[Record]) extends AnyVal {
.zipWithIndex
.map({ case (records, index) =>
val file = File(directory / s"partition.$index")
val writer = file.bufferedWriter()
val writer = file.bufferedOutput()

try {
records.foreach(_.toChars.foreach(writer.write(_)))
Expand Down
16 changes: 16 additions & 0 deletions rpc/src/main/protobuf/exchange.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
syntax = "proto3";

package kr.ac.postech.paranode.rpc;

import "common.proto";

service Exchange {
rpc Save (SaveRequest) returns (SaveReply) {}
}


message SaveRequest {
bytes block = 1;
}

message SaveReply {}
37 changes: 37 additions & 0 deletions rpc/src/main/scala/ExchangeClient.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package kr.ac.postech.paranode.rpc

import io.grpc.ManagedChannel
import io.grpc.ManagedChannelBuilder
import kr.ac.postech.paranode.core.Block
import kr.ac.postech.paranode.rpc.exchange.ExchangeGrpc.ExchangeStub
import kr.ac.postech.paranode.rpc.exchange.SaveReply
import kr.ac.postech.paranode.rpc.exchange.SaveRequest

import java.util.concurrent.TimeUnit
import scala.concurrent.Future

import Implicit._

object ExchangeClient {
def apply(host: String, port: Int): ExchangeClient = {
val channel =
ManagedChannelBuilder.forAddress(host, port).usePlaintext().build
val stub = exchange.ExchangeGrpc.stub(channel)
new ExchangeClient(channel, stub)
}
}

class ExchangeClient private (
private val channel: ManagedChannel,
private val stub: ExchangeStub
) {
def shutdown(): Unit = {
channel.shutdown.awaitTermination(5, TimeUnit.SECONDS)
}

def save(block: Block): Future[SaveReply] = {
val request = SaveRequest(block)

stub.save(request)
}
}
50 changes: 50 additions & 0 deletions rpc/src/main/scala/ExchangeService.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package kr.ac.postech.paranode.rpc

import kr.ac.postech.paranode.core.Block
import org.apache.logging.log4j.scala.Logging

import java.util.UUID
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.reflect.io.Directory

import exchange.{ExchangeGrpc, SaveReply, SaveRequest}
import Implicit._

class ExchangeService(
executionContext: ExecutionContext,
outputDirectory: Directory
) extends ExchangeGrpc.Exchange
with Logging {
override def save(request: SaveRequest): Future[SaveReply] = {
val promise = Promise[SaveReply]

Future {

try {
logger.info(s"[ExchangeServer] Save ($request)")

val block: Block = request.block

val path = outputDirectory / UUID.randomUUID().toString

logger.info(s"[ExchangeServer] Writing block to $path")

block.writeTo(path)

logger.info(s"[ExchangeServer] Block written to $path")

promise.success(new SaveReply())
} catch {
case e: Exception => {
logger.trace(e)
logger.error(s"[ExchangeServer] Error: ${e.getMessage}")
promise.failure(e)
}
}
}(executionContext)

promise.future
}
}

0 comments on commit e71cd12

Please sign in to comment.