Skip to content

Commit

Permalink
fix: Don't treat Jibri busy response as transient error (#659)
Browse files Browse the repository at this point in the history
* ref: change StartExceptions to be individual class types

* don't treat a busy response as a transient error
  • Loading branch information
bbaldino authored Jan 7, 2021
1 parent c7c2614 commit 137c67e
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,9 @@ protected IQ handleStartRequest(JibriIq iq)
catch (JibriSession.StartException exc)
{
ErrorIQ errorIq;
String reason = exc.getReason();
String reason = exc.getMessage();

if (StartException.ALL_BUSY.equals(reason))
if (exc instanceof StartException.AllBusy)
{
logger.info("Failed to start a Jibri session, " +
"all Jibris were busy");
Expand All @@ -189,7 +189,7 @@ protected IQ handleStartRequest(JibriIq iq)
XMPPError.Condition.resource_constraint,
"all Jibris are busy");
}
else if (StartException.NOT_AVAILABLE.equals(reason))
else if (exc instanceof StartException.NotAvailable)
{
logger.info("Failed to start a Jibri session, " +
"no Jibris available");
Expand Down
92 changes: 68 additions & 24 deletions src/main/java/org/jitsi/jicofo/recording/jibri/JibriSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,10 @@ private void startInternal()
logger.error("Unable to find an available Jibri, can't start");

if (jibriDetector.isAnyInstanceConnected()) {
throw new StartException(StartException.ALL_BUSY);
throw new StartException.AllBusy();
}

throw new StartException(StartException.NOT_AVAILABLE);
throw new StartException.NotAvailable();
}

try
Expand All @@ -351,14 +351,17 @@ private void startInternal()
catch (Exception e)
{
logger.error("Failed to send start Jibri IQ: " + e, e);
jibriDetector.memberHadTransientError(jibriJid);
if (!(e instanceof StartException.OneBusy))
{
jibriDetector.memberHadTransientError(jibriJid);
}
if (!maxRetriesExceeded())
{
retryRequestWithAnotherJibri();
}
else
{
throw new StartException(StartException.INTERNAL_SERVER_ERROR);
throw new StartException.InternalServerError();
}
}
}
Expand Down Expand Up @@ -558,20 +561,22 @@ private void sendJibriStartIq(final Jid jibriJid)
"Unexpected response to start request: "
+ (reply != null ? reply.toXML() : "null"));

throw new StartException(StartException.UNEXPECTED_RESPONSE);
throw new StartException.UnexpectedResponse();
}

JibriIq jibriIq = (JibriIq) reply;

// According to the "protocol" only PENDING status is allowed in
// response to the start request.
if (!Status.PENDING.equals(jibriIq.getStatus()))
if (isBusyResponse(jibriIq))
{
logger.info("Jibri " + jibriIq.getFrom() + " was busy");
throw new StartException.OneBusy();
}
if (!isPendingResponse(jibriIq))
{
logger.error(
"Unexpected status received in response to the start IQ: "
+ jibriIq.toXML());

throw new StartException(StartException.UNEXPECTED_RESPONSE);
throw new StartException.UnexpectedResponse();
}

processJibriIqFromJibri(jibriIq);
Expand Down Expand Up @@ -830,25 +835,47 @@ void onSessionStateChanged(
JibriIq.FailureReason failureReason);
}

static public class StartException extends Exception
static public abstract class StartException extends Exception
{
final static String ALL_BUSY = "All Jibri instances are busy";
final static String INTERNAL_SERVER_ERROR = "Internal server error";
final static String NOT_AVAILABLE = "No Jibris available";
final static String UNEXPECTED_RESPONSE = "Unexpected response";

private final String reason;

StartException(String reason)
public StartException(String message)
{
super(reason);

this.reason = reason;
super(message);
}

String getReason()
static public class AllBusy extends StartException
{
public AllBusy()
{
super("All jibri instances are busy");
}
}
static public class InternalServerError extends StartException
{
return reason;
public InternalServerError()
{
super("Internal server error");
}
}
static public class NotAvailable extends StartException
{
public NotAvailable()
{
super("No Jibris available");
}
}
static public class UnexpectedResponse extends StartException
{
public UnexpectedResponse()
{
super("Unexpected response");
}
}
static public class OneBusy extends StartException
{
public OneBusy()
{
super("This Jibri instance was busy");
}
}
}

Expand Down Expand Up @@ -886,4 +913,21 @@ public void instanceOffline(Jid jid)
}
}

/**
* Returns true if the given IQ represens a busy response from Jibri
* @param iq
* @return
*/
private boolean isBusyResponse(JibriIq iq)
{
return Status.OFF.equals(iq.getStatus()) &&
iq.isFailure() &&
FailureReason.BUSY.equals(iq.getFailureReason());
}

private boolean isPendingResponse(JibriIq iq)
{
return Status.PENDING.equals(iq.getStatus());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -169,19 +169,19 @@ protected IQ handleStartRequest(JibriIq iq)
}
catch (StartException exc)
{
String reason = exc.getReason();
String reason = exc.getMessage();
logger.info(
"Failed to start a Jibri session: " + reason, exc);
sipSessions.remove(sipAddress);
ErrorIQ errorIq;
if (StartException.ALL_BUSY.equals(reason))
if (exc instanceof StartException.AllBusy)
{
errorIq = ErrorResponse.create(
iq,
XMPPError.Condition.resource_constraint,
"all Jibris are busy");
}
else if(StartException.NOT_AVAILABLE.equals(reason))
else if(exc instanceof StartException.NotAvailable)
{
errorIq = ErrorResponse.create(
iq,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import io.kotest.matchers.shouldNotBe
import io.mockk.every
import io.mockk.mockk
import io.mockk.mockkStatic
import io.mockk.slot
import io.mockk.spyk
import io.mockk.verify
import org.jitsi.jicofo.FocusBundleActivator
Expand Down Expand Up @@ -54,7 +55,7 @@ class JibriSessionTest : ShouldSpec({
JidCreate.bareFrom("jibri3@bar.com")
)
val detector: JibriDetector = mockk {
every { selectJibri() } returnsMany(jibriList)
every { selectJibri() } returnsMany (jibriList)
every { isAnyInstanceConnected } returns true
every { memberHadTransientError(any()) } answers {
// Simulate the real JibriDetector logic and put the Jibri at the back of the list
Expand Down Expand Up @@ -118,9 +119,38 @@ class JibriSessionTest : ShouldSpec({
shouldThrow<JibriSession.StartException> {
jibriSession.start()
}
verify(exactly = maxNumRetries + 1) { xmppConnection.sendPacketAndGetReply(any())}
verify(exactly = maxNumRetries + 1) { xmppConnection.sendPacketAndGetReply(any()) }
}
}
}
}
context("Trying to start a session with a Jibri that is busy") {
val iq = slot<IQ>()
// First return busy, then pending
every { xmppConnection.sendPacketAndGetReply(capture(iq)) } answers {
JibriIq().apply {
type = IQ.Type.result
from = iq.captured.to
to = iq.captured.from
shouldRetry = true
status = JibriIq.Status.OFF
failureReason = JibriIq.FailureReason.BUSY
}
} andThen {
JibriIq().apply {
type = IQ.Type.result
from = iq.captured.to
to = iq.captured.from
shouldRetry = true
status = JibriIq.Status.PENDING
}
}
jibriSession.start()
should("not count as a transient error") {
verify(exactly = 0) { detector.memberHadTransientError(any()) }
}
should("retry with another jibri") {
verify(exactly = 2) { xmppConnection.sendPacketAndGetReply(any()) }
}
}
})

0 comments on commit 137c67e

Please sign in to comment.