Skip to content
This repository has been archived by the owner on Jul 19, 2021. It is now read-only.

Commit

Permalink
Merge pull request #260 from joakim-hove/local-free-error
Browse files Browse the repository at this point in the history
Local free error
  • Loading branch information
joakim-hove authored Mar 8, 2018
2 parents 0c2be77 + d3690a8 commit 3f214bb
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 53 deletions.
2 changes: 1 addition & 1 deletion libjob_queue/include/ert/job_queue/job_queue_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ typedef struct job_queue_manager_struct job_queue_manager_type;
job_queue_manager_type * job_queue_manager_alloc( job_queue_type * job_queue );
void job_queue_manager_free( job_queue_manager_type * manager );
void job_queue_manager_start_queue( job_queue_manager_type * manager , int num_total_run , bool verbose);
void job_queue_manager_stop_queue(const job_queue_manager_type * manager);
void job_queue_manager_stop_queue(job_queue_manager_type * manager);
bool job_queue_manager_try_wait( job_queue_manager_type * manager , int timeout_seconds);
void job_queue_manager_wait( job_queue_manager_type * manager);
int job_queue_manager_get_num_running( const job_queue_manager_type * manager);
Expand Down
5 changes: 1 addition & 4 deletions libjob_queue/src/job_node.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,6 @@ void job_queue_node_free_data(job_queue_node_type * node) {
util_safe_free( node->status_file );
util_safe_free( node->run_cmd );
util_free_stringlist( node->argv , node->argc );

if (node->job_data != NULL)
util_abort("%s: internal error - driver spesific job data has not been freed - will leak.\n",__func__);
}


Expand Down Expand Up @@ -593,7 +590,7 @@ bool job_queue_node_kill( job_queue_node_type * node , job_queue_status_type * s
void job_queue_node_free_driver_data( job_queue_node_type * node , queue_driver_type * driver) {
pthread_mutex_lock( &node->data_mutex );
{
if (node->job_data != NULL)
if (node->job_data)
queue_driver_free_job( driver , node->job_data );
node->job_data = NULL;
}
Expand Down
8 changes: 7 additions & 1 deletion libjob_queue/src/job_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,11 @@ static bool submit_new_jobs(job_queue_type * queue) {
}
}

return new_jobs;
}


static void run_handlers(job_queue_type * queue) {
/*
Checking for complete / exited / overtime jobs
*/
Expand All @@ -874,7 +879,7 @@ static bool submit_new_jobs(job_queue_type * queue) {
break;
}
}
return new_jobs;

}


Expand Down Expand Up @@ -946,6 +951,7 @@ static void job_queue_loop(job_queue_type * queue, int num_total_run, bool verbo

if (!complete) {
new_jobs = submit_new_jobs(queue);
run_handlers(queue);
} else
/* print an updated status to stdout before exiting. */
if (verbose)
Expand Down
4 changes: 3 additions & 1 deletion libjob_queue/src/job_queue_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,11 @@ job_status_type job_queue_manager_iget_job_status(const job_queue_manager_type *
}


void job_queue_manager_stop_queue(const job_queue_manager_type * manager) {
void job_queue_manager_stop_queue(job_queue_manager_type * manager) {
job_queue_start_user_exit(manager->job_queue);

while(job_queue_is_running(manager->job_queue))
usleep(100000);

job_queue_manager_wait(manager);
}
85 changes: 39 additions & 46 deletions libjob_queue/src/local_driver.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@
Copyright (C) 2011 Statoil ASA, Norway.
The file 'local_driver.c' is part of ERT - Ensemble based Reservoir Tool.
ERT is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
ERT is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License at <http://www.gnu.org/licenses/gpl.html>
for more details.
ERT is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
ERT is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License at <http://www.gnu.org/licenses/gpl.html>
for more details.
*/

#include <sys/wait.h>
Expand All @@ -36,7 +36,6 @@ typedef struct local_job_struct local_job_type;
struct local_job_struct {
UTIL_TYPE_ID_DECLARATION;
bool active;
bool free_when_finished;
job_status_type status;
pthread_t run_thread;
pid_t child_process;
Expand Down Expand Up @@ -64,22 +63,14 @@ static local_job_type * local_job_alloc() {
job = util_malloc(sizeof * job );
UTIL_TYPE_ID_INIT( job , LOCAL_JOB_TYPE_ID );
job->active = false;
job->free_when_finished = false;
job->status = JOB_QUEUE_WAITING;
return job;
}

static void local_job_free(local_job_type * job) {
if (job->active)
job->free_when_finished = true;
else
free(job);
}



job_status_type local_driver_get_job_status(void * __driver, void * __job) {
if (__job == NULL)
if (__job == NULL)
/* The job has not been registered at all ... */
return JOB_QUEUE_NOT_ACTIVE;
else {
Expand All @@ -92,21 +83,24 @@ job_status_type local_driver_get_job_status(void * __driver, void * __job) {

void local_driver_free_job( void * __job ) {
local_job_type * job = local_job_safe_cast( __job );
local_job_free(job);
if (!job->active)
free(job);
}


void local_driver_kill_job( void * __driver , void * __job) {
local_job_type * job = local_job_safe_cast( __job );

if (job->active) {
pthread_cancel( job->run_thread );
}

kill( job->child_process , SIGTERM );
}



/*
This function needs to dereference the job pointer after the waitpid() call is
complete, it is therefor essential that no other threads have called free(job)
while the external process is running.
*/

void * submit_job_thread__(void * __arg) {
arg_pack_type *arg_pack = arg_pack_safe_cast(__arg);
const char *executable = arg_pack_iget_const_ptr(arg_pack, 0);
Expand All @@ -126,23 +120,22 @@ void * submit_job_thread__(void * __arg) {
arg_pack_free(arg_pack);
waitpid(job->child_process, &wait_status, 0);

if(job->free_when_finished) // A request to free the job sent while its thread was still running
free(job);
else {
job->active = false;
job->status = WIFEXITED(wait_status) ? JOB_QUEUE_DONE : JOB_QUEUE_IS_KILLED;
}
job->active = false;
job->status = JOB_QUEUE_EXIT;
if (WIFEXITED(wait_status))
if (WEXITSTATUS(wait_status) == 0)
job->status = JOB_QUEUE_DONE;

}
pthread_exit(NULL);
return NULL;
}



void * local_driver_submit_job(void * __driver ,
const char * submit_cmd ,
void * local_driver_submit_job(void * __driver ,
const char * submit_cmd ,
int num_cpu , /* Ignored */
const char * run_path ,
const char * run_path ,
const char * job_name ,
int argc ,
const char ** argv ) {
Expand All @@ -155,14 +148,14 @@ void * local_driver_submit_job(void * __driver ,
arg_pack_append_int( arg_pack , argc );
arg_pack_append_ptr( arg_pack , util_alloc_stringlist_copy( argv , argc )); /* Due to conflict with threads and python GC we take a local copy. */
arg_pack_append_ptr( arg_pack , job );

pthread_mutex_lock( &driver->submit_lock );
job->active = true;
job->status = JOB_QUEUE_RUNNING;
if (pthread_create( &job->run_thread , &driver->thread_attr , submit_job_thread__ , arg_pack) != 0)

if (pthread_create( &job->run_thread , &driver->thread_attr , submit_job_thread__ , arg_pack) != 0)
util_abort("%s: failed to create run thread - aborting \n",__func__);

pthread_mutex_unlock( &driver->submit_lock );
return job;
}
Expand All @@ -189,21 +182,21 @@ void * local_driver_alloc() {
pthread_mutex_init( &local_driver->submit_lock , NULL );
pthread_attr_init( &local_driver->thread_attr );
pthread_attr_setdetachstate( &local_driver->thread_attr , PTHREAD_CREATE_DETACHED );

return local_driver;
}


bool local_driver_set_option( void * __driver , const char * option_key , const void * value){
bool local_driver_set_option( void * __driver , const char * option_key , const void * value){
return false;
}

void local_driver_init_option_list(stringlist_type * option_list) {
//No options specific for local driver; do nothing
}

#undef LOCAL_DRIVER_ID
#undef LOCAL_JOB_ID
#undef LOCAL_DRIVER_ID
#undef LOCAL_JOB_ID

/*****************************************************************/

0 comments on commit 3f214bb

Please sign in to comment.