Skip to content

Commit

Permalink
Fixed Twitter stream stopping bug.
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Jul 24, 2014
1 parent 46e224a commit 011b525
Showing 1 changed file with 7 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ class TwitterReceiver(
storageLevel: StorageLevel
) extends Receiver[Status](storageLevel) with Logging {

private var twitterStream: TwitterStream = _
@volatile private var twitterStream: TwitterStream = _
@volatile private var stopped = false

def onStart() {
try {
Expand All @@ -78,7 +79,9 @@ class TwitterReceiver(
def onScrubGeo(l: Long, l1: Long) {}
def onStallWarning(stallWarning: StallWarning) {}
def onException(e: Exception) {
restart("Error receiving tweets", e)
if (!stopped) {
restart("Error receiving tweets", e)
}
}
})

Expand All @@ -91,12 +94,14 @@ class TwitterReceiver(
}
setTwitterStream(newTwitterStream)
logInfo("Twitter receiver started")
stopped = false
} catch {
case e: Exception => restart("Error starting Twitter stream", e)
}
}

def onStop() {
stopped = true
setTwitterStream(null)
logInfo("Twitter receiver stopped")
}
Expand Down

0 comments on commit 011b525

Please sign in to comment.