Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ETCM-841] Consume remaining data when extracting Hello in order #1019

Merged
merged 3 commits into from
Jun 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class RLPxConnectionHandler(

case Failure(ex) =>
log.debug(
s"[Stopping Connection] Init AuthHandshaker message handling failed for peer {} due to {}",
"[Stopping Connection] Init AuthHandshaker message handling failed for peer {} due to {}",
peerId,
ex.getMessage
)
Expand Down Expand Up @@ -132,7 +132,7 @@ class RLPxConnectionHandler(

case Failure(ex) =>
log.debug(
s"[Stopping Connection] Response AuthHandshaker message handling failed for peer {} due to {}",
"[Stopping Connection] Response AuthHandshaker message handling failed for peer {} due to {}",
peerId,
ex.getMessage
)
Expand All @@ -154,24 +154,25 @@ class RLPxConnectionHandler(
}

def handleTimeout: Receive = { case AuthHandshakeTimeout =>
log.debug(s"[Stopping Connection] Auth handshake timeout for peer {}", peerId)
log.debug("[Stopping Connection] Auth handshake timeout for peer {}", peerId)
context.parent ! ConnectionFailed
context stop self
}

def processHandshakeResult(result: AuthHandshakeResult, remainingData: ByteString): Unit =
result match {
case AuthHandshakeSuccess(secrets, remotePubKey) =>
log.debug(s"Auth handshake succeeded for peer {}", peerId)
log.debug("Auth handshake succeeded for peer {}", peerId)
context.parent ! ConnectionEstablished(remotePubKey)
if (remainingData.nonEmpty)
context.self ! Received(remainingData)
// following the specification at https://github.com/ethereum/devp2p/blob/master/rlpx.md#initial-handshake
// point 6 indicates that the next messages needs to be initial 'Hello'
context become awaitInitialHello(extractor(secrets))
// Unfortunately it is hard to figure out the proper order for messages to be handled in.
// FrameCodec assumes that bytes will arrive in the expected order
// To alleviate potential lapses in order each chunk of data needs to be passed to FrameCodec immediately
extractHello(extractor(secrets), remainingData)

case AuthHandshakeError =>
log.debug(s"[Stopping Connection] Auth handshake failed for peer {}", peerId)
log.debug("[Stopping Connection] Auth handshake failed for peer {}", peerId)
context.parent ! ConnectionFailed
context stop self
}
Expand All @@ -181,45 +182,37 @@ class RLPxConnectionHandler(
cancellableAckTimeout: Option[CancellableAckTimeout] = None,
seqNumber: Int = 0
): Receive =
handleWriteFailed orElse handleConnectionClosed orElse handleSendHello(
extractor,
cancellableAckTimeout,
seqNumber
) orElse handleReceiveHello(extractor, cancellableAckTimeout, seqNumber)

private def handleSendHello(
extractor: HelloCodec,
cancellableAckTimeout: Option[CancellableAckTimeout] = None,
seqNumber: Int = 0
): Receive = {
// TODO when cancellableAckTimeout is Some
case SendMessage(h: HelloEnc) =>
val out = extractor.writeHello(h)
connection ! Write(out, Ack)
val timeout =
system.scheduler.scheduleOnce(rlpxConfiguration.waitForTcpAckTimeout, self, AckTimeout(seqNumber))
context become awaitInitialHello(
extractor,
Some(CancellableAckTimeout(seqNumber, timeout)),
increaseSeqNumber(seqNumber)
)
case Ack if cancellableAckTimeout.nonEmpty =>
//Cancel pending message timeout
cancellableAckTimeout.foreach(_.cancellable.cancel())
context become awaitInitialHello(extractor, None, seqNumber)

case AckTimeout(ackSeqNumber) if cancellableAckTimeout.exists(_.seqNumber == ackSeqNumber) =>
cancellableAckTimeout.foreach(_.cancellable.cancel())
log.error(s"[Stopping Connection] Sending 'Hello' to {} failed", peerId)
context stop self
handleWriteFailed orElse handleConnectionClosed orElse {
// TODO when cancellableAckTimeout is Some
case SendMessage(h: HelloEnc) =>
val out = extractor.writeHello(h)
connection ! Write(out, Ack)
val timeout =
system.scheduler.scheduleOnce(rlpxConfiguration.waitForTcpAckTimeout, self, AckTimeout(seqNumber))
context become awaitInitialHello(
extractor,
Some(CancellableAckTimeout(seqNumber, timeout)),
increaseSeqNumber(seqNumber)
)
case Ack if cancellableAckTimeout.nonEmpty =>
//Cancel pending message timeout
cancellableAckTimeout.foreach(_.cancellable.cancel())
context become awaitInitialHello(extractor, None, seqNumber)

}
case AckTimeout(ackSeqNumber) if cancellableAckTimeout.exists(_.seqNumber == ackSeqNumber) =>
cancellableAckTimeout.foreach(_.cancellable.cancel())
log.error("[Stopping Connection] Sending 'Hello' to {} failed", peerId)
context stop self
case Received(data) =>
extractHello(extractor, data, cancellableAckTimeout, seqNumber)
}

private def handleReceiveHello(
private def extractHello(
extractor: HelloCodec,
data: ByteString,
cancellableAckTimeout: Option[CancellableAckTimeout] = None,
seqNumber: Int = 0
dzajkowski marked this conversation as resolved.
Show resolved Hide resolved
): Receive = { case Received(data) =>
): Unit = {
extractor.readHello(data) match {
case Some((hello, restFrames)) =>
val messageCodecOpt = for {
Expand All @@ -236,12 +229,12 @@ class RLPxConnectionHandler(
seqNumber = seqNumber
)
case None =>
log.debug(s"[Stopping Connection] Unable to negotiate protocol with {}", peerId)
log.debug("[Stopping Connection] Unable to negotiate protocol with {}", peerId)
context.parent ! ConnectionFailed
context stop self
}
case None =>
log.debug(s"[Stopping Connection] Did not find 'Hello' in message from {}", peerId)
log.debug("[Stopping Connection] Did not find 'Hello' in message from {}", peerId)
context become awaitInitialHello(extractor, cancellableAckTimeout, seqNumber)
}
}
Expand All @@ -262,7 +255,7 @@ class RLPxConnectionHandler(
context.parent ! MessageReceived(message)

case Failure(ex) =>
log.info(s"Cannot decode message from {}, because of {}", peerId, ex.getMessage)
log.info("Cannot decode message from {}, because of {}", peerId, ex.getMessage)
// break connection in case of failed decoding, to avoid attack which would send us garbage
context stop self
}
Expand Down Expand Up @@ -310,7 +303,7 @@ class RLPxConnectionHandler(

case AckTimeout(ackSeqNumber) if cancellableAckTimeout.exists(_.seqNumber == ackSeqNumber) =>
cancellableAckTimeout.foreach(_.cancellable.cancel())
log.debug(s"[Stopping Connection] Write to {} failed", peerId)
log.debug("[Stopping Connection] Write to {} failed", peerId)
context stop self
}
}
Expand All @@ -332,7 +325,7 @@ class RLPxConnectionHandler(
): Unit = {
val out = messageCodec.encodeMessage(messageToSend)
connection ! Write(out, Ack)
log.debug(s"Sent message: {} to {}", messageToSend.underlyingMsg.toShortString, peerId)
log.debug("Sent message: {} to {}", messageToSend.underlyingMsg.toShortString, peerId)

val timeout = system.scheduler.scheduleOnce(rlpxConfiguration.waitForTcpAckTimeout, self, AckTimeout(seqNumber))
context become handshaked(
Expand All @@ -356,7 +349,7 @@ class RLPxConnectionHandler(

def handleWriteFailed: Receive = { case CommandFailed(cmd: Write) =>
log.debug(
s"[Stopping Connection] Write to peer {} failed, trying to send {}",
"[Stopping Connection] Write to peer {} failed, trying to send {}",
peerId,
Hex.toHexString(cmd.data.toArray[Byte])
)
Expand All @@ -365,10 +358,10 @@ class RLPxConnectionHandler(

def handleConnectionClosed: Receive = { case msg: ConnectionClosed =>
if (msg.isPeerClosed) {
log.debug(s"[Stopping Connection] Connection with {} closed by peer", peerId)
log.debug("[Stopping Connection] Connection with {} closed by peer", peerId)
}
if (msg.isErrorClosed) {
log.debug(s"[Stopping Connection] Connection with {} closed because of error {}", peerId, msg.getErrorCause)
log.debug("[Stopping Connection] Connection with {} closed because of error {}", peerId, msg.getErrorCause)
}

context stop self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,10 @@ class RLPxConnectionHandlerSpec
.expects(data)
.returning((response, AuthHandshakeSuccess(mock[Secrets], ByteString())))
(mockHelloExtractor.readHello _)
.expects(hello)
.expects(ByteString.empty)
.returning(Some(Hello(5, "", Capabilities.Eth63Capability::Nil, 30303, ByteString("abc")), Seq.empty))
(mockMessageCodec.readMessages _)
.expects(ByteString.empty)
.expects(hello)
.returning(Nil) //For processing of messages after handshaking finishes

rlpxConnection ! Tcp.Received(data)
Expand Down