Skip to content

Commit

Permalink
Isolate exceptions in event/status listeners (#314)
Browse files Browse the repository at this point in the history
An exception in a user's listener shouldn't crash the whole event engine
  • Loading branch information
wkal-pubnub authored Nov 27, 2024
1 parent 4e27a06 commit 986142b
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@ import com.pubnub.api.callbacks.SubscribeCallback
import com.pubnub.api.crypto.CryptoModule
import com.pubnub.api.enums.PNStatusCategory
import com.pubnub.api.models.consumer.PNBoundedPage
import com.pubnub.api.models.consumer.PNPublishResult
import com.pubnub.api.models.consumer.PNStatus
import com.pubnub.api.models.consumer.pubsub.PNMessageResult
import com.pubnub.api.v2.PNConfigurationOverride
import com.pubnub.api.v2.callbacks.EventListener
import com.pubnub.api.v2.callbacks.Result
import com.pubnub.api.v2.callbacks.StatusListener
import com.pubnub.api.v2.callbacks.getOrThrow
import com.pubnub.api.v2.entities.Channel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import com.pubnub.api.v2.callbacks.StatusListener
import com.pubnub.api.v2.subscriptions.Subscription
import com.pubnub.internal.subscribe.eventengine.effect.MessagesConsumer
import com.pubnub.internal.subscribe.eventengine.effect.StatusConsumer
import org.slf4j.LoggerFactory
import java.util.concurrent.CopyOnWriteArrayList

class ListenerManager(val pubnub: PubNub) : MessagesConsumer, StatusConsumer, EventEmitter, StatusEmitter {
Expand All @@ -26,6 +27,8 @@ class ListenerManager(val pubnub: PubNub) : MessagesConsumer, StatusConsumer, Ev
private val statusListeners get() = listeners.filterIsInstance<StatusListener>()
private val eventListeners get() = listeners.filterIsInstance<EventListener>()

private val log = LoggerFactory.getLogger(this.javaClass.simpleName)

/**
* Add a listener.
*
Expand Down Expand Up @@ -65,49 +68,59 @@ class ListenerManager(val pubnub: PubNub) : MessagesConsumer, StatusConsumer, Ev
}

override fun announce(status: PNStatus) {
statusListeners.forEach { it.status(pubnub, status) }
statusListeners.safeForEach { it.status(pubnub, status) }
}

override fun announce(message: PNMessageResult) {
eventListeners.forEach { it.message(pubnub, message) }
eventListeners.safeForEach { it.message(pubnub, message) }
val envelope = AnnouncementEnvelope(message)
subscriptionCallbacks.forEach { it.message(pubnub, envelope) }
setCallbacks.forEach { it.message(pubnub, envelope) }
subscriptionCallbacks.safeForEach { it.message(pubnub, envelope) }
setCallbacks.safeForEach { it.message(pubnub, envelope) }
}

override fun announce(presence: PNPresenceEventResult) {
eventListeners.forEach { it.presence(pubnub, presence) }
eventListeners.safeForEach { it.presence(pubnub, presence) }
val envelope = AnnouncementEnvelope(presence)
subscriptionCallbacks.forEach { it.presence(pubnub, envelope) }
setCallbacks.forEach { it.presence(pubnub, envelope) }
subscriptionCallbacks.safeForEach { it.presence(pubnub, envelope) }
setCallbacks.safeForEach { it.presence(pubnub, envelope) }
}

override fun announce(signal: PNSignalResult) {
eventListeners.forEach { it.signal(pubnub, signal) }
eventListeners.safeForEach { it.signal(pubnub, signal) }
val envelope = AnnouncementEnvelope(signal)
subscriptionCallbacks.forEach { it.signal(pubnub, envelope) }
setCallbacks.forEach { it.signal(pubnub, envelope) }
subscriptionCallbacks.safeForEach { it.signal(pubnub, envelope) }
setCallbacks.safeForEach { it.signal(pubnub, envelope) }
}

override fun announce(messageAction: PNMessageActionResult) {
eventListeners.forEach { it.messageAction(pubnub, messageAction) }
eventListeners.safeForEach { it.messageAction(pubnub, messageAction) }
val envelope = AnnouncementEnvelope(messageAction)
subscriptionCallbacks.forEach { it.messageAction(pubnub, envelope) }
setCallbacks.forEach { it.messageAction(pubnub, envelope) }
subscriptionCallbacks.safeForEach { it.messageAction(pubnub, envelope) }
setCallbacks.safeForEach { it.messageAction(pubnub, envelope) }
}

override fun announce(pnObjectEventResult: PNObjectEventResult) {
eventListeners.forEach { it.objects(pubnub, pnObjectEventResult) }
eventListeners.safeForEach { it.objects(pubnub, pnObjectEventResult) }
val envelope = AnnouncementEnvelope(pnObjectEventResult)
subscriptionCallbacks.forEach { it.objects(pubnub, envelope) }
setCallbacks.forEach { it.objects(pubnub, envelope) }
subscriptionCallbacks.safeForEach { it.objects(pubnub, envelope) }
setCallbacks.safeForEach { it.objects(pubnub, envelope) }
}

override fun announce(pnFileEventResult: PNFileEventResult) {
eventListeners.forEach { it.file(pubnub, pnFileEventResult) }
eventListeners.safeForEach { it.file(pubnub, pnFileEventResult) }
val envelope = AnnouncementEnvelope(pnFileEventResult)
subscriptionCallbacks.forEach { it.file(pubnub, envelope) }
setCallbacks.forEach { it.file(pubnub, envelope) }
subscriptionCallbacks.safeForEach { it.file(pubnub, envelope) }
setCallbacks.safeForEach { it.file(pubnub, envelope) }
}

private inline fun <T> Iterable<T>.safeForEach(action: (T) -> Unit) {
for (element in this) {
try {
action(element)
} catch (e: Throwable) {
log.warn("Uncaught exception in listener.", e)
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@ internal class EmitMessagesEffect(
override fun runEffect() {
log.trace("Running EmitMessagesEffect")
for (message in messages) {
when (message) {
is PNMessageResult -> messagesConsumer.announce(message)
is PNPresenceEventResult -> messagesConsumer.announce(message)
is PNSignalResult -> messagesConsumer.announce(message)
is PNMessageActionResult -> messagesConsumer.announce(message)
is PNObjectEventResult -> messagesConsumer.announce(message)
is PNFileEventResult -> messagesConsumer.announce(message)
try {
when (message) {
is PNMessageResult -> messagesConsumer.announce(message)
is PNPresenceEventResult -> messagesConsumer.announce(message)
is PNSignalResult -> messagesConsumer.announce(message)
is PNMessageActionResult -> messagesConsumer.announce(message)
is PNObjectEventResult -> messagesConsumer.announce(message)
is PNFileEventResult -> messagesConsumer.announce(message)
}
} catch (_: Throwable) {
// ignore
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.pubnub.internal.managers

import com.google.gson.JsonPrimitive
import com.pubnub.api.PubNub
import com.pubnub.api.legacy.BaseTest
import com.pubnub.api.models.consumer.pubsub.BasePubSubResult
import com.pubnub.api.models.consumer.pubsub.PNMessageResult
import com.pubnub.api.v2.callbacks.EventListener
import org.junit.Test
import org.junit.jupiter.api.Assertions.assertEquals

class ListenerManagerTest : BaseTest() {
@Test
fun `exception in listener is isolated from other listeners`() {
// given
val listenerManager = ListenerManager(pubnub)
val exception = Exception("Crash!")
var received = mutableListOf<Any>()

listenerManager.addListener(object : EventListener {
override fun message(pubnub: PubNub, result: PNMessageResult) {
received += exception
throw exception
}
})
listenerManager.addListener(object : EventListener {
override fun message(pubnub: PubNub, result: PNMessageResult) {
received += true
}
})

// when
listenerManager.announce(PNMessageResult(BasePubSubResult("a", null, null, null, null), JsonPrimitive("a")))

// then
assertEquals(listOf(exception, true), received)
}
}

0 comments on commit 986142b

Please sign in to comment.