Skip to content

Commit

Permalink
build: improve shutdown behavior in errors
Browse files Browse the repository at this point in the history
  • Loading branch information
wjwwood committed Apr 12, 2014
1 parent 6671195 commit a5519ef
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 170 deletions.
347 changes: 177 additions & 170 deletions catkin_tools/verbs/catkin_build/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,178 +385,185 @@ def build_isolated_workspace(
executors[x] = e
e.start()

# Variables for tracking running jobs and built/building packages
start = time.time()
total_packages = len(packages_to_be_built)
package_count = 0
running_jobs = {}
log_dir = os.path.join(context.build_space, 'build_logs')
color = True
if not force_color and not is_tty(sys.stdout):
try: # Finally close out now running executors
# Variables for tracking running jobs and built/building packages
start = time.time()
total_packages = len(packages_to_be_built)
package_count = 0
running_jobs = {}
log_dir = os.path.join(context.build_space, 'build_logs')
color = True
out = OutputController(log_dir, quiet, interleave_output, color, max_package_name_length, prefix_output=(jobs > 1))
if no_status:
disable_wide_log()

# Prime the job_queue
ready_packages = []
if start_with is None:
ready_packages = get_ready_packages(packages_to_be_built, running_jobs, completed_packages)
while start_with is not None:
ready_packages.extend(get_ready_packages(packages_to_be_built, running_jobs, completed_packages))
while ready_packages:
pth, pkg = ready_packages.pop(0)
if pkg.name != start_with:
completed_packages.append(pkg.name)
package_count += 1
wide_log("[build] Skipping package '{0}'".format(pkg.name))
else:
ready_packages.insert(0, (pth, pkg))
start_with = None
break
running_jobs = queue_ready_packages(ready_packages, running_jobs, job_queue, context, force_cmake)
assert running_jobs

error_state = False
errors = []

def set_error_state(error_state):
if error_state:
return
# Set the error state to prevent new jobs
error_state = True
# Empty the job queue
while not job_queue.empty():
job_queue.get()
# Kill the executors by sending a None to the job queue for each of them
for x in range(jobs):
job_queue.put(None)

# While any executors are running, process executor events
while executors:
try:
# Try to get an event from the communications queue
if not force_color and not is_tty(sys.stdout):
color = True

This comment has been minimized.

Copy link
@dirk-thomas

dirk-thomas Jun 26, 2015

Contributor

This looks like a bug to me. Should it assign False?

This comment has been minimized.

Copy link
@wjwwood

wjwwood Jun 26, 2015

Author Member

Yeah, fixed in 9568fa8

out = OutputController(log_dir, quiet, interleave_output,
color, max_package_name_length, prefix_output=(jobs > 1))
if no_status:
disable_wide_log()

# Prime the job_queue
ready_packages = []
if start_with is None:
ready_packages = get_ready_packages(packages_to_be_built, running_jobs, completed_packages)
while start_with is not None:
ready_packages.extend(get_ready_packages(packages_to_be_built, running_jobs, completed_packages))
while ready_packages:
pth, pkg = ready_packages.pop(0)
if pkg.name != start_with:
completed_packages.append(pkg.name)
package_count += 1
wide_log("[build] Skipping package '{0}'".format(pkg.name))
else:
ready_packages.insert(0, (pth, pkg))
start_with = None
break
running_jobs = queue_ready_packages(ready_packages, running_jobs, job_queue, context, force_cmake)
assert running_jobs

error_state = False
errors = []

def set_error_state(error_state):
if error_state:
return
# Set the error state to prevent new jobs
error_state = True
# Empty the job queue
while not job_queue.empty():
job_queue.get()
# Kill the executors by sending a None to the job queue for each of them
for x in range(jobs):
job_queue.put(None)

# While any executors are running, process executor events
while executors:
try:
event = comm_queue.get(True, 0.1)
except Empty:
# timeout occured, create null event to pass through checks
event = ExecutorEvent(None, None, None, None)

if event.event_type == 'job_started':
package_count += 1
running_jobs[event.package]['package_number'] = package_count
running_jobs[event.package]['start_time'] = time.time()
out.job_started(event.package)

if event.event_type == 'command_started':
out.command_started(event.package, event.data['cmd'], event.data['location'])

if event.event_type == 'command_log':
out.command_log(event.package, event.data['message'])

if event.event_type == 'command_failed':
out.command_failed(event.package, event.data['cmd'], event.data['location'], event.data['retcode'])
# Add to list of errors
errors.append(event)
# Remove the command from the running jobs
del running_jobs[event.package]
# If it hasn't already been done, stop the executors
set_error_state(error_state)
# Try to get an event from the communications queue
try:
event = comm_queue.get(True, 0.1)
except Empty:
# timeout occured, create null event to pass through checks
event = ExecutorEvent(None, None, None, None)

if event.event_type == 'job_started':
package_count += 1
running_jobs[event.package]['package_number'] = package_count
running_jobs[event.package]['start_time'] = time.time()
out.job_started(event.package)

if event.event_type == 'command_started':
out.command_started(event.package, event.data['cmd'], event.data['location'])

if event.event_type == 'command_log':
out.command_log(event.package, event.data['message'])

if event.event_type == 'command_failed':
out.command_failed(event.package, event.data['cmd'], event.data['location'], event.data['retcode'])
# Add to list of errors
errors.append(event)
# Remove the command from the running jobs
del running_jobs[event.package]
# If it hasn't already been done, stop the executors
set_error_state(error_state)

if event.event_type == 'command_finished':
out.command_finished(event.package, event.data['cmd'],
event.data['location'], event.data['retcode'])

if event.event_type == 'job_finished':
completed_packages.append(event.package)
run_time = format_time_delta(time.time() - running_jobs[event.package]['start_time'])
out.job_finished(event.package, run_time)
del running_jobs[event.package]
# If shutting down, do not add new packages
if error_state:
continue
# Calculate new packages
if not no_status:
wide_log('[build] Calculating new jobs...', end='\r')
sys.stdout.flush()
ready_packages = get_ready_packages(packages_to_be_built, running_jobs, completed_packages)
running_jobs = queue_ready_packages(ready_packages, running_jobs, job_queue, context, force_cmake)
# Make sure there are jobs to be/being processed, otherwise kill the executors
if not running_jobs:
# Kill the executors by sending a None to the job queue for each of them
for x in range(jobs):
job_queue.put(None)

# If an executor exit event, join it and remove it from the executors list
if event.event_type == 'exit':
# If an executor has an exception, set the error state
if event.data['reason'] == 'exception':
set_error_state(error_state)
errors.append(event)
# Join and remove it
executors[event.executor_id].join()
del executors[event.executor_id]

if event.event_type == 'command_finished':
out.command_finished(event.package, event.data['cmd'], event.data['location'], event.data['retcode'])

if event.event_type == 'job_finished':
completed_packages.append(event.package)
run_time = format_time_delta(time.time() - running_jobs[event.package]['start_time'])
out.job_finished(event.package, run_time)
del running_jobs[event.package]
# If shutting down, do not add new packages
if error_state:
continue
# Calculate new packages
if not no_status:
wide_log('[build] Calculating new jobs...', end='\r')
# Update the status bar on the screen
executing_jobs = []
for name, value in running_jobs.items():
number, job, start_time = value['package_number'], value['job'], value['start_time']
if number is None or start_time is None:
continue
executing_jobs.append({
'number': number,
'name': name,
'run_time': format_time_delta_short(time.time() - start_time)
})
msg = clr("[build - {run_time}] ").format(run_time=format_time_delta_short(time.time() - start))
# If errors post those
if errors:
for error in errors:
msg += clr("[!{package}] ").format(package=error.package)
# Print them in order of started number
for job_msg_args in sorted(executing_jobs, key=lambda args: args['number']):
msg += clr("[{name} - {run_time}] ").format(**job_msg_args)
msg_rhs = clr("[{0}/{1} Active | {2}/{3} Completed]").format(
len(executing_jobs),
len(executors),
len(packages) if no_deps else len(completed_packages),
total_packages
)
# Update title bar
sys.stdout.write("\x1b]2;[build] {0}/{1}\x07".format(
len(packages) if no_deps else len(completed_packages),
total_packages
))
# Update status bar
wide_log(msg, rhs=msg_rhs, end='\r')
sys.stdout.flush()
ready_packages = get_ready_packages(packages_to_be_built, running_jobs, completed_packages)
running_jobs = queue_ready_packages(ready_packages, running_jobs, job_queue, context, force_cmake)
# Make sure there are jobs to be/being processed, otherwise kill the executors
if not running_jobs:
# Kill the executors by sending a None to the job queue for each of them
for x in range(jobs):
job_queue.put(None)

# If an executor exit event, join it and remove it from the executors list
if event.event_type == 'exit':
# If an executor has an exception, set the error state
if event.data['reason'] == 'exception':
set_error_state(error_state)
errors.append(event)
# Join and remove it
executors[event.executor_id].join()
del executors[event.executor_id]

if not no_status:
# Update the status bar on the screen
executing_jobs = []
for name, value in running_jobs.items():
number, job, start_time = value['package_number'], value['job'], value['start_time']
if number is None or start_time is None:
continue
executing_jobs.append({
'number': number,
'name': name,
'run_time': format_time_delta_short(time.time() - start_time)
})
msg = clr("[build - {run_time}] ").format(run_time=format_time_delta_short(time.time() - start))
# If errors post those
if errors:
for error in errors:
msg += clr("[!{package}] ").format(package=error.package)
# Print them in order of started number
for job_msg_args in sorted(executing_jobs, key=lambda args: args['number']):
msg += clr("[{name} - {run_time}] ").format(**job_msg_args)
msg_rhs = clr("[{0}/{1} Active | {2}/{3} Completed]").format(
len(executing_jobs),
len(executors),
len(packages) if no_deps else len(completed_packages),
total_packages
)
# Update title bar
sys.stdout.write("\x1b]2;[build] {0}/{1}\x07".format(
len(packages) if no_deps else len(completed_packages),
total_packages
))
# Update status bar
wide_log(msg, rhs=msg_rhs, end='\r')
sys.stdout.flush()
except KeyboardInterrupt:
wide_log("[build] User interrupted, stopping.")
set_error_state(error_state)
# All executors have shutdown
sys.stdout.write("\x1b]2;\x07")
if not errors:
if not context.merge_devel:
if not context.install:
_create_unmerged_devel_setup(context)
else:
_create_unmerged_devel_setup_for_install(context)
wide_log("[build] Finished.")
return 0
else:
wide_log(clr("[build] There were @!@{rf}errors@|:"))
for error in errors:
if error.event_type == 'exit':
wide_log("""Executor '{exec_id}' had an unhandle exception while processing package '{package}':
{data[exc]}
""".format(exec_id=error.executor_id + 1, **error.__dict__))
else:
wide_log(clr("""
@{rf}Failed@| to build package '@{cf}{package}@|' because the following command:
@!@{kf}# Command run in directory: @|{location}
{cmd.cmd_str}
@{rf}Exited@| with return code: @!{retcode}@|""").format(package=error.package, **error.data))
sys.exit(1)
except KeyboardInterrupt:
wide_log("[build] User interrupted, stopping.")
set_error_state(error_state)
# All executors have shutdown
sys.stdout.write("\x1b]2;\x07")
if not errors:
if not context.merge_devel:
if not context.install:
_create_unmerged_devel_setup(context)
else:
_create_unmerged_devel_setup_for_install(context)
wide_log("[build] Finished.")
return 0
else:
wide_log(clr("[build] There were @!@{rf}errors@|:"))
for error in errors:
if error.event_type == 'exit':
wide_log("""Executor '{exec_id}' had an unhandle exception while processing package '{package}':
{data[exc]}
""".format(exec_id=error.executor_id + 1, **error.__dict__))
else:
wide_log(clr("""
@{rf}Failed@| to build package '@{cf}{package}@|' because the following command:
@!@{kf}# Command run in directory: @|{location}
{cmd.cmd_str}
@{rf}Exited@| with return code: @!{retcode}@|""").format(package=error.package, **error.data))
sys.exit(1)
finally:
# Ensure executors go down
for x in range(jobs):
job_queue.put(None)
3 changes: 3 additions & 0 deletions tests/system/verbs/catkin_build/test_whitespace_in_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,7 @@ def test_catkin_build_with_whitespace_in_paths():
ret = main(cmd)
except SystemExit as exc:
ret = exc.code
if ret != 0:
import traceback
traceback.print_exc()
assert ret == 0, cmd

0 comments on commit a5519ef

Please sign in to comment.