Skip to content

Commit

Permalink
Fixing review comments: Merging both setCallSite methods
Browse files Browse the repository at this point in the history
  • Loading branch information
mubarak committed Aug 20, 2014
1 parent c26d933 commit 33a7295
Showing 1 changed file with 5 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,19 +110,13 @@ abstract class DStream[T: ClassTag] (
val creationSite = Utils.getCallSite

/* Store the RDD creation callSite in threadlocal */
private def setRDDCreationCallSite() = {
ssc.sparkContext.setLocalProperty(Utils.CALL_SITE_SHORT, creationSite.shortForm)
ssc.sparkContext.setLocalProperty(Utils.CALL_SITE_LONG, creationSite.longForm)
}

/* Store the supplied callSite in threadlocal */
private def setRDDCallSite(callSite: CallSite) = {
private def setRDDCreationCallSite(callSite: CallSite = creationSite) = {
ssc.sparkContext.setLocalProperty(Utils.CALL_SITE_SHORT, callSite.shortForm)
ssc.sparkContext.setLocalProperty(Utils.CALL_SITE_LONG, callSite.longForm)
}

/* Return the current callSite */
private[streaming] def getCallSite(): CallSite = {
private[streaming] def getRDDCreationCallSite(): CallSite = {
CallSite(ssc.sparkContext.getLocalProperty(Utils.CALL_SITE_SHORT),
ssc.sparkContext.getLocalProperty(Utils.CALL_SITE_LONG))
}
Expand Down Expand Up @@ -309,8 +303,8 @@ abstract class DStream[T: ClassTag] (
// (based on sliding time of this DStream), then generate the RDD
case None => {
if (isTimeValid(time)) {
val prevCallSite = getCallSite
setRDDCreationCallSite
val prevCallSite = getRDDCreationCallSite
setRDDCreationCallSite()
val rddOption = compute(time) match {
case Some(newRDD) =>
if (storageLevel != StorageLevel.NONE) {
Expand All @@ -329,7 +323,7 @@ abstract class DStream[T: ClassTag] (
case None =>
return None
}
setRDDCallSite(prevCallSite)
setRDDCreationCallSite(prevCallSite)
return rddOption
} else {
return None
Expand Down

0 comments on commit 33a7295

Please sign in to comment.