-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathEchoServiceImpl.kt
90 lines (81 loc) · 2.87 KB
/
EchoServiceImpl.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package io.grpc.kt
import io.grpc.kt.core.LogUtils
import io.grpc.stub.StreamObserver
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.delay
import org.slf4j.LoggerFactory
class EchoServiceImpl(private val lag: Long = 0) : EchoGrpcKt.EchoImplBase() {
private val logger = LoggerFactory.getLogger(this.javaClass)
override suspend fun unary(req: EchoProto.EchoReq): EchoProto.EchoResp {
logger.debug("unary req=${LogUtils.objectString(req)} id=${req.id}")
if (lag > 0) {
delay(lag)
}
val resp = EchoProto.EchoResp.newBuilder()
.setId(req.id)
.setValue(req.value)
.build()
logger.debug("unary resp=${LogUtils.objectString(resp)} id=${resp.id}")
return resp
}
override suspend fun serverStreaming(req: EchoProto.EchoCountReq): ReceiveChannel<EchoProto.EchoResp> {
logger.debug("serverStreaming req=${LogUtils.objectString(req)} count=${req.count}")
if (lag > 0) {
delay(lag)
}
return GlobalScope.produce {
for (i in 0 until req.count) {
val msg = EchoProto.EchoResp.newBuilder()
.setId(i)
.setValue(i.toString())
.build()
logger.debug("serverStreaming resp.msg=${LogUtils.objectString(msg)} id=${msg.id}")
send(msg)
}
}
}
override suspend fun clientStreaming(req: ReceiveChannel<EchoProto.EchoReq>): EchoProto.EchoCountResp {
var count = 0
for (msg in req) {
if (lag > 0) {
delay(lag)
}
logger.debug("clientStreaming req.msg=${LogUtils.objectString(msg)} id=${msg.id}")
require(msg.id == count)
count++
}
val resp = EchoProto.EchoCountResp.newBuilder()
.setCount(count)
.build()
logger.debug("clientStreaming resp=${LogUtils.objectString(resp)} count=${resp.count}")
return resp
}
override suspend fun bidiStreaming(req: ReceiveChannel<EchoProto.EchoReq>): ReceiveChannel<EchoProto.EchoResp> {
return GlobalScope.produce {
var count = 0
for (reqMsg in req) {
if (lag > 0) {
delay(lag)
}
require(reqMsg.id == count)
count++
logger.debug("bidiStreaming req.msg=${LogUtils.objectString(reqMsg)} id=${reqMsg.id}")
val respMsg = EchoProto.EchoResp.newBuilder()
.setId(reqMsg.id)
.setValue(reqMsg.value)
.build()
logger.debug("bidiStreaming resp.msg=${LogUtils.objectString(respMsg)} id=${respMsg.id}")
send(respMsg)
}
}
}
}
class EchoServiceJavaImpl(private val lag: Long = 0) : EchoGrpc.EchoImplBase() {
override fun unary(req: EchoProto.EchoReq, responseObserver: StreamObserver<EchoProto.EchoResp>) {
val resp = EchoProto.EchoResp.newBuilder().setId(req.id).setValue(req.value).build()
responseObserver.onNext(resp)
responseObserver.onCompleted()
}
}