diff --git a/libjob_queue/include/ert/job_queue/job_queue_manager.h b/libjob_queue/include/ert/job_queue/job_queue_manager.h index 7fe82892d0..fa0026f517 100644 --- a/libjob_queue/include/ert/job_queue/job_queue_manager.h +++ b/libjob_queue/include/ert/job_queue/job_queue_manager.h @@ -31,7 +31,7 @@ typedef struct job_queue_manager_struct job_queue_manager_type; job_queue_manager_type * job_queue_manager_alloc( job_queue_type * job_queue ); void job_queue_manager_free( job_queue_manager_type * manager ); void job_queue_manager_start_queue( job_queue_manager_type * manager , int num_total_run , bool verbose); - void job_queue_manager_stop_queue(const job_queue_manager_type * manager); + void job_queue_manager_stop_queue(job_queue_manager_type * manager); bool job_queue_manager_try_wait( job_queue_manager_type * manager , int timeout_seconds); void job_queue_manager_wait( job_queue_manager_type * manager); int job_queue_manager_get_num_running( const job_queue_manager_type * manager); diff --git a/libjob_queue/src/job_node.c b/libjob_queue/src/job_node.c index 1121b552b1..456f598bfc 100644 --- a/libjob_queue/src/job_node.c +++ b/libjob_queue/src/job_node.c @@ -193,9 +193,6 @@ void job_queue_node_free_data(job_queue_node_type * node) { util_safe_free( node->status_file ); util_safe_free( node->run_cmd ); util_free_stringlist( node->argv , node->argc ); - - if (node->job_data != NULL) - util_abort("%s: internal error - driver spesific job data has not been freed - will leak.\n",__func__); } @@ -593,7 +590,7 @@ bool job_queue_node_kill( job_queue_node_type * node , job_queue_status_type * s void job_queue_node_free_driver_data( job_queue_node_type * node , queue_driver_type * driver) { pthread_mutex_lock( &node->data_mutex ); { - if (node->job_data != NULL) + if (node->job_data) queue_driver_free_job( driver , node->job_data ); node->job_data = NULL; } diff --git a/libjob_queue/src/job_queue.c b/libjob_queue/src/job_queue.c index 5620c25362..c13ae53b61 100644 --- a/libjob_queue/src/job_queue.c +++ b/libjob_queue/src/job_queue.c @@ -851,6 +851,11 @@ static bool submit_new_jobs(job_queue_type * queue) { } } + return new_jobs; +} + + +static void run_handlers(job_queue_type * queue) { /* Checking for complete / exited / overtime jobs */ @@ -874,7 +879,7 @@ static bool submit_new_jobs(job_queue_type * queue) { break; } } - return new_jobs; + } @@ -946,6 +951,7 @@ static void job_queue_loop(job_queue_type * queue, int num_total_run, bool verbo if (!complete) { new_jobs = submit_new_jobs(queue); + run_handlers(queue); } else /* print an updated status to stdout before exiting. */ if (verbose) diff --git a/libjob_queue/src/job_queue_manager.c b/libjob_queue/src/job_queue_manager.c index 34f9d27854..58519bef16 100644 --- a/libjob_queue/src/job_queue_manager.c +++ b/libjob_queue/src/job_queue_manager.c @@ -184,9 +184,11 @@ job_status_type job_queue_manager_iget_job_status(const job_queue_manager_type * } -void job_queue_manager_stop_queue(const job_queue_manager_type * manager) { +void job_queue_manager_stop_queue(job_queue_manager_type * manager) { job_queue_start_user_exit(manager->job_queue); while(job_queue_is_running(manager->job_queue)) usleep(100000); + + job_queue_manager_wait(manager); } diff --git a/libjob_queue/src/local_driver.c b/libjob_queue/src/local_driver.c index 19d2471c81..cc493585cd 100644 --- a/libjob_queue/src/local_driver.c +++ b/libjob_queue/src/local_driver.c @@ -2,18 +2,18 @@ Copyright (C) 2011 Statoil ASA, Norway. The file 'local_driver.c' is part of ERT - Ensemble based Reservoir Tool. - - ERT is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - ERT is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. - - See the GNU General Public License at - for more details. + + ERT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + ERT is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. + + See the GNU General Public License at + for more details. */ #include @@ -36,7 +36,6 @@ typedef struct local_job_struct local_job_type; struct local_job_struct { UTIL_TYPE_ID_DECLARATION; bool active; - bool free_when_finished; job_status_type status; pthread_t run_thread; pid_t child_process; @@ -64,22 +63,14 @@ static local_job_type * local_job_alloc() { job = util_malloc(sizeof * job ); UTIL_TYPE_ID_INIT( job , LOCAL_JOB_TYPE_ID ); job->active = false; - job->free_when_finished = false; job->status = JOB_QUEUE_WAITING; return job; } -static void local_job_free(local_job_type * job) { - if (job->active) - job->free_when_finished = true; - else - free(job); -} - job_status_type local_driver_get_job_status(void * __driver, void * __job) { - if (__job == NULL) + if (__job == NULL) /* The job has not been registered at all ... */ return JOB_QUEUE_NOT_ACTIVE; else { @@ -92,21 +83,24 @@ job_status_type local_driver_get_job_status(void * __driver, void * __job) { void local_driver_free_job( void * __job ) { local_job_type * job = local_job_safe_cast( __job ); - local_job_free(job); + if (!job->active) + free(job); } void local_driver_kill_job( void * __driver , void * __job) { local_job_type * job = local_job_safe_cast( __job ); - - if (job->active) { - pthread_cancel( job->run_thread ); - } - kill( job->child_process , SIGTERM ); } + +/* + This function needs to dereference the job pointer after the waitpid() call is + complete, it is therefor essential that no other threads have called free(job) + while the external process is running. +*/ + void * submit_job_thread__(void * __arg) { arg_pack_type *arg_pack = arg_pack_safe_cast(__arg); const char *executable = arg_pack_iget_const_ptr(arg_pack, 0); @@ -126,23 +120,22 @@ void * submit_job_thread__(void * __arg) { arg_pack_free(arg_pack); waitpid(job->child_process, &wait_status, 0); - if(job->free_when_finished) // A request to free the job sent while its thread was still running - free(job); - else { - job->active = false; - job->status = WIFEXITED(wait_status) ? JOB_QUEUE_DONE : JOB_QUEUE_IS_KILLED; - } + job->active = false; + job->status = JOB_QUEUE_EXIT; + if (WIFEXITED(wait_status)) + if (WEXITSTATUS(wait_status) == 0) + job->status = JOB_QUEUE_DONE; + } - pthread_exit(NULL); return NULL; } -void * local_driver_submit_job(void * __driver , - const char * submit_cmd , +void * local_driver_submit_job(void * __driver , + const char * submit_cmd , int num_cpu , /* Ignored */ - const char * run_path , + const char * run_path , const char * job_name , int argc , const char ** argv ) { @@ -155,14 +148,14 @@ void * local_driver_submit_job(void * __driver , arg_pack_append_int( arg_pack , argc ); arg_pack_append_ptr( arg_pack , util_alloc_stringlist_copy( argv , argc )); /* Due to conflict with threads and python GC we take a local copy. */ arg_pack_append_ptr( arg_pack , job ); - + pthread_mutex_lock( &driver->submit_lock ); job->active = true; job->status = JOB_QUEUE_RUNNING; - - if (pthread_create( &job->run_thread , &driver->thread_attr , submit_job_thread__ , arg_pack) != 0) + + if (pthread_create( &job->run_thread , &driver->thread_attr , submit_job_thread__ , arg_pack) != 0) util_abort("%s: failed to create run thread - aborting \n",__func__); - + pthread_mutex_unlock( &driver->submit_lock ); return job; } @@ -189,12 +182,12 @@ void * local_driver_alloc() { pthread_mutex_init( &local_driver->submit_lock , NULL ); pthread_attr_init( &local_driver->thread_attr ); pthread_attr_setdetachstate( &local_driver->thread_attr , PTHREAD_CREATE_DETACHED ); - + return local_driver; } -bool local_driver_set_option( void * __driver , const char * option_key , const void * value){ +bool local_driver_set_option( void * __driver , const char * option_key , const void * value){ return false; } @@ -202,8 +195,8 @@ void local_driver_init_option_list(stringlist_type * option_list) { //No options specific for local driver; do nothing } -#undef LOCAL_DRIVER_ID -#undef LOCAL_JOB_ID +#undef LOCAL_DRIVER_ID +#undef LOCAL_JOB_ID /*****************************************************************/