-
-
Notifications
You must be signed in to change notification settings - Fork 8.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[jvm-packages] error using a custom evaluation function (setCustomEval) on spark #3595
Comments
What's your cluster setup? And can you post your custom evaluation function? |
I have seven r4.2xlarge on AWS EMR and my spark-submit is: spark-submit --driver-memory 15G --executor-memory 10G --conf spark.executor.cores=4 --conf spark.task.cpus=2 --conf spark.executor.extraJavaOptions="-XX:ThreadStackSize=41920" --class Main Main.jar I have tested this example custom evaluation and is not working -- > https://github.com/dmlc/xgboost/blob/master/jvm-packages/xgboost4j-example/src/main/scala/ml/dmlc/xgboost4j/scala/example/util/CustomEval.scala On my local machine is working. |
are you using akka-version rabit? can you try python one? |
With python the execution is freezed. My code: var paramsXGB:Map[String, Any] = Map[String, Any]("tracker_conf" -> TrackerConf(600*3000L, "python")) var xgb = new XGBoostClassifier(paramsXGB).setFeaturesCol("features").setNumRound(200).setNumWorkers(20).setObjective("binary:logistic").setSilent(0).setMaxDepth(4.0).setMinChildWeight(5.0).setGamma(0.0).setEta(0.2).setSubsample(0.9).setColsampleBytree(inparams.colSampleByTree).setAlpha(0.0).setScalePosWeight(120).setNthread(2).setSeed(12345).setMissing(0).setNumEarlyStoppingRounds(10).setTrainTestRatio(0.9) The problem happens when I use NumEarlyStoppingRounds as stop criterion. |
how many executors you tried to claim? you are requiring 20 * 2 cores, but I didn't see num-executors parameter in your spark-submit |
@hcho3, @CodingCat, thanks for the help. I set now num-executors equal to 20 and execution continue freezing. |
can you read this part:https://xgboost.readthedocs.io/en/latest/jvm/xgboost4j_spark_tutorial.html#gang-scheduling and set a timeout threshold for resource claiming, I think somehow you didn't get enough resources for training by setting this, your application should fail if not being able to get enough cores within your thresholding time (ms) |
Thank you, it's working now. However, the code that motivated me to test the custom evaluation function example has same freeze problem. Below is the code that calculates the maximum KS (http://www.physics.csbsju.edu/stats/KS-test.html). Any suggestion? class KSEvaluation extends EvalTrait {
val logger = LogFactory.getLog(classOf[KSEvaluation])
private[neurotech] var evalMetric: String = "ks"
override def getMetric: String = evalMetric
override def eval(predicts: Array[Array[Float]], dmat: DMatrix): Float = {
var error: Float = 0f
var labels: Array[Float] = null
try {
labels = dmat.getLabel
} catch {
case ex: XGBoostError =>
logger.error(ex)
return -1f
}
require(predicts.length == labels.length, s"predicts length ${predicts.length} has to be" +
s" equal with label length ${labels.length}")
var bins = 256
val nrow: Int = predicts.length
var p0 = getCol(1, predicts)
var norm = discretize(p0, bins)
var maus = Array.fill[Int](bins)(0)
var bons = Array.fill[Int](bins)(0)
for (i <- 0 until nrow) {
var l = labels(i).toInt
bons(norm(i)) += 1 - l
maus(norm(i)) += l
}
var bonsf = bons.map(_ / (bons.sum*1.0f))
var mausf = maus.map(_ / (maus.sum*1.0f))
var freq_ac_bom = Array.fill[Float](bins)(0)
var freq_ac_mau = Array.fill[Float](bins)(0)
var maxKS:Float = 0.0f
for(i <-0 until bins){
if(i==0){
freq_ac_bom(i) = bonsf(i)
freq_ac_mau(i) = mausf(i)
}else{
freq_ac_bom(i) = freq_ac_bom(i-1) + bonsf(i)
freq_ac_mau(i) = freq_ac_mau(i-1) + mausf(i)
}
var d = math.abs(freq_ac_bom(i) -freq_ac_mau(i))
if(d>maxKS){
maxKS = d
}
}
maxKS
}
def discretize(values: Array[Float], bins:Int = 256): Array[Int] = {
var max = values.max
var min = values.min
var binsminusone= (bins-1)
var minmax = (max-min)
var norm = values.map(_ - min)
norm = norm.map(_/ minmax)
norm = norm.map(_ * binsminusone)
var normi = norm.map(_.toInt)
normi
}
def getCol(n: Int, a: Array[Array[Float]]) = a.map{_(n - 1)}
}
|
When I use the KSEvaluation function happens this (I set a timeout threshold for resource claiming and I am using python):
|
18/08/21 18:21:55 INFO RabitTracker$TrackerProcessLogger: 2018-08-21 18:21:55,868 DEBUG Recieve recover signal from 12 this is suspicious, can you track any of your executor die? or even did you turn on something like dynamic allocation? |
@CodingCat , I set spark.dynamicAllocation.enabled equal to false. Below is a log and an image of the active tasks. I have noticed that two executors satisfy the stop criterion and complete the task (6 and 9). Two executors (4 and 15) are not active (but they have not stopped because of the stop criterion). Could that be the problem? Other difference is that executors 6 and 9 appear as RDD block.
|
can you give more logs on those active executors? somehow your xgboost workers sent recover signal to tracker but in driver side I didn't see any log indicating task fail and if you turn off early stopping, is the problem still there? |
Yes, If I turn off early stopping It works. I think that the problem happens when a worker stops according to stop criterion. Other workers is not stoping and they want continue. When use a default metric it doesn't happen. All workers stop the same time. |
@CodingCat , To better understand what is happening, I have modified the following lines in XGBoost.java:
When I use a metric as the AUC or any other metric available for the xgboost, the behavior is this in the first log. The evaluation is done using all validation set. Notice that each worker's response is the same. When I use a custom metric, the response for each worker is different, because it only evaluates your validation minibatch. Therefore, if the stop criterion for a worker is satisfied, only it stops and the others try to continue. Please, how do I get him to evaluate the all validation set in each worker? Is there any way to make all the workers stop when one stops? or, the workers continue until each stop criterion is satisfied? log1
log2
|
it looks like the customized metrics is not synced properly in xgboost, I need to take a chance to look into this, is turning off early stopping an option for you? |
@CodingCat, Thank you for your help. Unfortunately, I need turning on early stopping to optimize the parameters according to certain metrics and to avoid overfitting. |
@wsobra as it takes time for me to debug and fix this, can you use paramGrid in MLLIB to search the best configuration of numRounds for you? |
@CodingCat Any updates on custom evaluation on XGBoost4J-Spark? |
Next release.. |
I am trying to use a custom evaluation function (setCustomEval) on spark. However, it is giving the following error:
[ERROR] [08/16/2018 13:34:42.313] [RabitTracker-akka.actor.default-dispatcher-11] [LocalActorRefProvider(akka://RabitTracker)] guardian failed, shutting down system
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:156)
at ml.dmlc.xgboost4j.scala.rabit.handler.WorkerDependencyResolver$$anonfun$receive$2.applyOrElse(RabitTrackerHandler.scala:298)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at ml.dmlc.xgboost4j.scala.rabit.handler.WorkerDependencyResolver.aroundReceive(RabitTrackerHandler.scala:264)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
The text was updated successfully, but these errors were encountered: