Skip to content

Commit

Permalink
Move IQ handling to IO pool where it could block (#1082)
Browse files Browse the repository at this point in the history
* Fix potential failures to leave muc before joining.

Client not currently joined torture180077@conference.prtorture.jitsi.net
	at org.jivesoftware.smackx.muc.MultiUserChat.leave(MultiUserChat.java:769)
	at org.jitsi.jicofo.xmpp.muc.ChatRoomImpl.joinAs(ChatRoomImpl.kt:219)
	at org.jitsi.jicofo.xmpp.muc.ChatRoomImpl.join(ChatRoomImpl.kt:191)

* fix: Handle incoming jingle IQs in an IO thread.

* Run grantOwnership in an IO thread.
  • Loading branch information
bgrozev authored Apr 21, 2023
1 parent f0d2311 commit fb9add1
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.jitsi.jicofo.xmpp.jingle
import org.jitsi.jicofo.util.WeakValueMap
import org.jitsi.jicofo.xmpp.AbstractIqHandler
import org.jitsi.jicofo.xmpp.IqProcessingResult
import org.jitsi.jicofo.xmpp.IqProcessingResult.AcceptedWithResponse
import org.jitsi.jicofo.xmpp.IqProcessingResult.RejectedWithError
import org.jitsi.jicofo.xmpp.IqRequest
import org.jitsi.utils.logging2.createLogger
Expand Down Expand Up @@ -53,13 +52,7 @@ class JingleIqRequestHandler(
)
}

val error = session.processIq(request.iq)
return if (error == null) {
AcceptedWithResponse(IQ.createResultIQ(request.iq))
} else {
logger.info("Returning error: request=${request.iq.toXML()}, error=${error.toXML()} ")
RejectedWithError(IQ.createErrorResponse(request.iq, error))
}
return session.processIq(request.iq)
}

fun registerSession(session: JingleSession) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@
*/
package org.jitsi.jicofo.xmpp.jingle

import org.jitsi.jicofo.TaskPools
import org.jitsi.jicofo.conference.source.ConferenceSourceMap
import org.jitsi.jicofo.xmpp.IqProcessingResult
import org.jitsi.jicofo.xmpp.createSessionInitiate
import org.jitsi.jicofo.xmpp.createTransportReplace
import org.jitsi.jicofo.xmpp.sendIqAndGetResponse
import org.jitsi.jicofo.xmpp.tryToSendStanza
import org.jitsi.utils.MediaType
import org.jitsi.utils.OrderedJsonObject
import org.jitsi.utils.logging2.createLogger
import org.jitsi.utils.queue.PacketQueue
import org.jitsi.xmpp.extensions.jingle.ContentPacketExtension
import org.jitsi.xmpp.extensions.jingle.GroupPacketExtension
import org.jitsi.xmpp.extensions.jingle.JingleAction
Expand Down Expand Up @@ -64,42 +67,85 @@ class JingleSession(
addContext("sid", sid)
}

private val incomingIqQueue = PacketQueue<JingleIQ>(
Integer.MAX_VALUE,
true,
"jingle-iq-queue-$sid",
{
doProcessIq(it)
return@PacketQueue true
},
TaskPools.ioPool
)

private val localJid: Jid = connection.user

fun processIq(iq: JingleIQ): StanzaError? {
fun processIq(iq: JingleIQ): IqProcessingResult {
val action = iq.action
?: return StanzaError.getBuilder(StanzaError.Condition.bad_request)
.setConditionText("Missing 'action'").build()
?: return IqProcessingResult.RejectedWithError(
IQ.createErrorResponse(
iq,
StanzaError.getBuilder(StanzaError.Condition.bad_request)
.setConditionText("Missing 'action'").build()
)
)
JingleStats.stanzaReceived(action)

if (state == State.ENDED) {
return StanzaError.getBuilder(StanzaError.Condition.gone).setConditionText("session ended").build()
return IqProcessingResult.RejectedWithError(
IQ.createErrorResponse(
iq,
StanzaError.getBuilder(StanzaError.Condition.gone).setConditionText("session ended").build()
)
)
}

return when (action) {
incomingIqQueue.add(iq)
return IqProcessingResult.AcceptedWithNoResponse()
}

private fun doProcessIq(iq: JingleIQ) {
val error = when (iq.action) {
JingleAction.SESSION_ACCEPT -> {
// The session needs to be marked as active early to allow code executing as part of onSessionAccept
// to proceed (e.g. to signal source updates).
state = State.ACTIVE
val error = requestHandler.onSessionAccept(this, iq.contentList)
if (error != null) state = State.ENDED
return error
error
}

JingleAction.SESSION_INFO -> requestHandler.onSessionInfo(this, iq)
JingleAction.SESSION_TERMINATE -> requestHandler.onSessionTerminate(this, iq).also {
state = State.ENDED
}

JingleAction.TRANSPORT_ACCEPT -> requestHandler.onTransportAccept(this, iq.contentList)
JingleAction.TRANSPORT_INFO -> requestHandler.onTransportInfo(this, iq.contentList)
JingleAction.TRANSPORT_REJECT -> { requestHandler.onTransportReject(this, iq); null }
JingleAction.TRANSPORT_REJECT -> {
requestHandler.onTransportReject(this, iq); null
}

JingleAction.ADDSOURCE, JingleAction.SOURCEADD -> requestHandler.onAddSource(this, iq.contentList)
JingleAction.REMOVESOURCE, JingleAction.SOURCEREMOVE -> requestHandler.onRemoveSource(this, iq.contentList)
JingleAction.REMOVESOURCE, JingleAction.SOURCEREMOVE -> requestHandler.onRemoveSource(
this,
iq.contentList
)

else -> {
logger.warn("unsupported action $action")
logger.warn("unsupported action ${iq.action}")
StanzaError.getBuilder(StanzaError.Condition.feature_not_implemented)
.setConditionText("Unsupported 'action'").build()
}
}

val response = if (error == null) {
IQ.createResultIQ(iq)
} else {
logger.info("Returning error: request=${iq.toXML()}, error=${error.toXML()} ")
IQ.createErrorResponse(iq, error)
}
connection.tryToSendStanza(response)
}

fun terminate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,12 @@ class ChatRoomImpl(
@Throws(SmackException::class, XMPPException::class, InterruptedException::class)
private fun joinAs(nickname: Resourcepart, meetingIdToSet: String?) {
myOccupantJid = JidCreate.entityFullFrom(roomJid, nickname)
if (muc.isJoined) {
muc.leave()
synchronized(muc) {
if (muc.isJoined) {
muc.leave()
}
}

muc.addPresenceInterceptor(presenceInterceptor)
muc.createOrJoin(nickname)
val config = muc.configurationForm
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
*/
package org.jitsi.jicofo.xmpp.muc

import org.jitsi.jicofo.TaskPools
import org.jitsi.jicofo.auth.AuthenticationAuthority
import org.jitsi.jicofo.auth.AuthenticationListener
import org.jitsi.utils.OrderedJsonObject
import org.jitsi.utils.logging2.createLogger
import org.jitsi.utils.queue.PacketQueue

/**
* Manages to XMPP roles of occupants in a chat room, i.e. grants ownership to certain users.
Expand All @@ -38,6 +40,17 @@ sealed class ChatRoomRoleManager(
open fun stop() {}

open val debugState: OrderedJsonObject = OrderedJsonObject()

protected val queue = PacketQueue<Runnable>(
Integer.MAX_VALUE,
false,
"chat-room-role-manager-queue-${chatRoom.roomJid}",
{
it.run()
return@PacketQueue true
},
TaskPools.ioPool
)
}

/**
Expand Down Expand Up @@ -69,25 +82,27 @@ class AutoOwnerRoleManager(chatRoom: ChatRoom) : ChatRoomRoleManager(chatRoom) {
}

private fun electNewOwner() {
if (owner != null) {
return
}

// Skip if this is a breakout room.
if (chatRoom.isBreakoutRoom) {
return
}

owner = chatRoom.members.find { !it.isRobot && it.role.hasOwnerRights() }
if (owner != null) {
return
}

val newOwner = chatRoom.members.find { !it.isRobot && it.role != MemberRole.VISITOR }
if (newOwner != null) {
logger.info("Electing new owner: $newOwner")
chatRoom.grantOwnership(newOwner)
owner = newOwner
queue.add {
if (owner != null) {
return@add
}

// Skip if this is a breakout room.
if (chatRoom.isBreakoutRoom) {
return@add
}

owner = chatRoom.members.find { !it.isRobot && it.role.hasOwnerRights() }
if (owner != null) {
return@add
}

val newOwner = chatRoom.members.find { !it.isRobot && it.role != MemberRole.VISITOR }
if (newOwner != null) {
logger.info("Electing new owner: $newOwner")
chatRoom.grantOwnership(newOwner)
owner = newOwner
}
}
}

Expand All @@ -106,7 +121,12 @@ class AuthenticationRoleManager(
) : ChatRoomRoleManager(chatRoom) {

private val authenticationListener = AuthenticationListener { userJid, _, _ ->
chatRoom.members.find { it.jid == userJid }?.let { chatRoom.grantOwnership(it) }
chatRoom.members.find { it.jid == userJid }?.let {
queue.add {
logger.info("Granting ownership to $it.")
chatRoom.grantOwnership(it)
}
}
}

init {
Expand All @@ -127,15 +147,17 @@ class AuthenticationRoleManager(
return
}

grantOwnerToAuthenticatedUsers()
queue.add { grantOwnerToAuthenticatedUsers() }
}

/**
* Handles cases where moderators(already authenticated users) reload and join again.
*/
override fun memberJoined(member: ChatRoomMember) {
if (member.role != MemberRole.OWNER && authenticationAuthority.getSessionForJid(member.jid) != null) {
chatRoom.grantOwnership(member)
queue.add {
chatRoom.grantOwnership(member)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import io.kotest.core.test.TestCase
import io.kotest.core.test.TestResult
import io.kotest.matchers.shouldBe
import io.kotest.matchers.shouldNotBe
import io.kotest.matchers.types.shouldBeInstanceOf
import io.mockk.every
import io.mockk.mockk
import org.jitsi.config.withNewConfig
Expand All @@ -49,7 +48,6 @@ import org.jitsi.xmpp.extensions.jingle.JingleAction
import org.jitsi.xmpp.extensions.jingle.JingleIQ
import org.jitsi.xmpp.extensions.jingle.RtpDescriptionPacketExtension
import org.jivesoftware.smack.packet.IQ
import org.jivesoftware.smack.packet.StanzaError
import org.jxmpp.jid.Jid
import org.jxmpp.jid.impl.JidCreate
import shouldBeValidJson
Expand Down Expand Up @@ -320,27 +318,24 @@ class ConferenceTest : ShouldSpec() {

context("Adding sources already signaled") {
val sources = remoteParticipant3.sources
jingleSession3.processIq(remoteParticipant3.createSourceAdd(sources)).let {
it shouldNotBe null
it.shouldBeInstanceOf<StanzaError>()
}
jingleSession3.processIq(remoteParticipant3.createSourceAdd(sources))

xmppConnection.requests.last().type shouldBe IQ.Type.error
}
context("Adding sources used by another participant") {
val sources = remoteParticipants[1].sources
jingleSession3.processIq(remoteParticipant3.createSourceAdd(sources)).let {
it shouldNotBe null
it.shouldBeInstanceOf<StanzaError>()
}
jingleSession3.processIq(remoteParticipant3.createSourceAdd(sources))

xmppConnection.requests.last().type shouldBe IQ.Type.error
}
context("Adding invalid sources") {
jingleSession3.processIq(remoteParticipant3.createSourceAdd(remoteParticipant3.sources)).let {
it shouldNotBe null
it.shouldBeInstanceOf<StanzaError>()
}
jingleSession3.processIq(remoteParticipant3.createSourceAdd(remoteParticipant3.sources))

xmppConnection.requests.last().type shouldBe IQ.Type.error
}
context("A participant leaving") {
val newSource2 = EndpointSourceSet(remoteParticipant3.nextSource(MediaType.AUDIO))
participant3.jingleSession!!.processIq(remoteParticipant3.createSourceAdd(newSource2))
jingleSession3.processIq(remoteParticipant3.createSourceAdd(newSource2))
remoteParticipants.forEach {
val lastJingleMessageSent = it.requests.last()
lastJingleMessageSent.action shouldBe JingleAction.SOURCEADD
Expand Down Expand Up @@ -370,9 +365,11 @@ class ConferenceTest : ShouldSpec() {

// Execute tasks in place (in the current thread, blocking)
val inPlaceExecutor: ExecutorService = mockk {
every { submit(any()) } answers {
every { submit(any<Runnable>()) } answers {
firstArg<Runnable>().run()
CompletableFuture<Unit>()
CompletableFuture<Unit>().apply {
complete(Unit)
}
}
every { execute(any()) } answers {
firstArg<Runnable>().run()
Expand All @@ -396,6 +393,8 @@ class ColibriAndJingleXmppConnection : MockXmppConnection() {
var ssrcs = 1L
val colibri2Server = TestColibri2Server()
val remoteParticipants = mutableMapOf<Jid, RemoteParticipant>()
// IQs sent by jicofo
val requests = mutableListOf<IQ>()

override fun handleIq(iq: IQ): IQ? = when (iq) {
is ConferenceModifyIQ -> colibri2Server.handleConferenceModifyIq(iq)
Expand All @@ -404,6 +403,8 @@ class ColibriAndJingleXmppConnection : MockXmppConnection() {
println("Not handling ${iq.toXML()}")
null
}
}.also {
requests.add(iq)
}

private fun nextSource(mediaType: MediaType) = Source(ssrcs++, mediaType)
Expand Down

0 comments on commit fb9add1

Please sign in to comment.