Skip to content
This repository has been archived by the owner on Nov 11, 2019. It is now read-only.

Commit

Permalink
pulselistener: Restart failed tasks with restart capabilities (#1988)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bastien Abadie authored Apr 3, 2019
1 parent d9cedf5 commit b32b4e1
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 4 deletions.
24 changes: 24 additions & 0 deletions src/pulselistener/pulselistener/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
'''
TASK_MD = '* [{0}](https://tools.taskcluster.net/task-inspector/#{0})'
TASKCLUSTER_NAMESPACE = 'project.releng.services.tasks.{task_id}'


class Monitoring(object):
Expand All @@ -36,6 +37,7 @@ def __init__(self, period):
# TC services
self.notify = None
self.queue = None
self.index = None

# Setup monitoring queue
self.tasks = asyncio.Queue()
Expand All @@ -46,6 +48,7 @@ def connect_taskcluster(self, client_id=None, access_token=None):
'''
self.notify = get_service('notify', client_id, access_token)
self.queue = get_service('queue', client_id, access_token)
self.index = get_service('index', client_id, access_token)

async def add_task(self, group_id, hook_id, task_id):
'''
Expand Down Expand Up @@ -103,6 +106,11 @@ async def check_task(self):
if task_status == 'exception':
await self.retry_task(group_id, hook_id, task_id)

# Lookup the failed details
if task_status == 'failed' and self.is_restartable(task_id):
logger.info('Failed task is restartable', task_id=task_id)
await self.retry_task(group_id, hook_id, task_id)

# Add to report
if hook_id not in self.stats:
self.stats[hook_id] = {'failed': [], 'completed': [], 'exception': []}
Expand All @@ -112,6 +120,22 @@ async def check_task(self):
# Push back into queue so it get checked later on
await self.tasks.put((group_id, hook_id, task_id))

def is_restartable(self, task_id):
'''
A task is restartable if its indexed state using task id
has a monitoring_restart field set to True
'''
# Load the indexed data
task_path = TASKCLUSTER_NAMESPACE.format(task_id=task_id)
try:
index = self.index.findTask(task_path)
except Exception as e:
logger.info('Task not found in index', task=task_id, error=str(e))
return False

# Restart when monitoring_restart is set
return index['data'].get('monitoring_restart') is True

async def retry_task(self, group_id, hook_id, task_id):
'''
Retry a Taskcluster task by:
Expand Down
24 changes: 24 additions & 0 deletions src/pulselistener/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ def task(self, task_id):
'requires': 'all-completed',
'retries': retry,
'scopes': [],
'routes': [
'index.{}.latest'.format(task_id),
],
'taskGroupId': 'group-{}'.format(task_id),
'workerType': 'niceWorker'
}
Expand Down Expand Up @@ -104,6 +107,27 @@ def triggerHook(self, group_id, hook_id, payload):
return Mock()


@pytest.fixture
def IndexMock():
class Mock():
def __init__(self):
pass

def findTask(self, path):
assert path.startswith('project.releng.services.tasks.')
failed = 'failed' in path
return {
'taskId': path[30:],
'data': {
'state': failed and 'error' or 'done',
'error_code': failed and 'somethingBad' or None,
'monitoring_restart': (failed and 'restart' in path)
}
}

return Mock()


@pytest.fixture
@contextmanager
def PhabricatorMock():
Expand Down
26 changes: 26 additions & 0 deletions src/pulselistener/tests/test_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,3 +245,29 @@ async def test_monitoring_retry_exceptions(QueueMock, NotifyMock):
assert monitoring.stats['Hook2']['exception'] == ['Task-exception-retry:0']
assert len(monitoring.queue.created_tasks) == 1
assert monitoring.tasks.qsize() == 1


@pytest.mark.asyncio
async def test_monitoring_restartable(QueueMock, IndexMock):
monitoring = Monitoring(1)

monitoring.index = IndexMock
monitoring.queue = QueueMock

# Check a failed task is restartable
# when the monitoring flag is set
assert monitoring.is_restartable('Task-failed-restart')

# Check a failed task is not restartable
# when the monitoring flag is not set
assert not monitoring.is_restartable('Task-failed-nope')

# Check a completed task is not restartable
assert not monitoring.is_restartable('Task-completed-restart')
assert not monitoring.is_restartable('Task-completed-nope')

# Check a failed task is restarted by the monitoring flow
assert len(monitoring.queue.created_tasks) == 0
await monitoring.add_task('Group', 'Hook-staticanalysis/bot', 'Task-failed-restart')
await monitoring.check_task()
assert len(monitoring.queue.created_tasks) == 1
14 changes: 12 additions & 2 deletions src/staticanalysis/bot/static_analysis_bot/workflows/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,16 +141,26 @@ def index(self, revision, **kwargs):
payload['try_task_id'] = settings.try_task_id
payload['try_group_id'] = settings.try_group_id

# Add restartable flag for monitoring
payload['monitoring_restart'] = payload['state'] == 'error' and \
payload.get('error_code') in ('watchdog', 'mercurial')

# Add a sub namespace with the task id to be able to list
# tasks from the parent namespace
namespaces = revision.namespaces + [
'{}.{}'.format(namespace, settings.taskcluster.task_id)
for namespace in revision.namespaces
]

# Build complete namespaces list, with monitoring update
full_namespaces = [
TASKCLUSTER_NAMESPACE.format(channel=settings.app_channel, name=name)
for name in namespaces
]
full_namespaces.append('project.releng.services.tasks.{}'.format(settings.taskcluster.task_id))

# Index for all required namespaces
for name in namespaces:
namespace = TASKCLUSTER_NAMESPACE.format(channel=settings.app_channel, name=name)
for namespace in full_namespaces:
self.index_service.insertTask(
namespace,
{
Expand Down
55 changes: 53 additions & 2 deletions src/staticanalysis/bot/tests/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ def test_taskcluster_index(mock_try_config, mock_try_workflow):
mock_try_workflow.index_service = mock.Mock()
rev = Revision()
rev.namespaces = ['mock.1234']
rev.as_dict = lambda: {'id': '1234', 'someData': 'mock'}
rev.as_dict = lambda: {'id': '1234', 'someData': 'mock', 'state': 'done', }
mock_try_workflow.index(rev, test='dummy')

assert mock_try_workflow.index_service.insertTask.call_count == 2
assert mock_try_workflow.index_service.insertTask.call_count == 3
calls = mock_try_workflow.index_service.insertTask.call_args_list

# First call with namespace
Expand All @@ -46,3 +46,54 @@ def test_taskcluster_index(mock_try_config, mock_try_workflow):
assert args['data']['try_group_id'] == 'remoteTryGroup'
assert args['data']['someData'] == 'mock'
assert 'indexed' in args['data']

# Third call for monitoring
namespace, args = calls[2][0]
assert namespace == 'project.releng.services.tasks.12345deadbeef'
assert args['taskId'] == '12345deadbeef'
assert args['data']['test'] == 'dummy'
assert args['data']['id'] == '1234'
assert args['data']['source'] == 'try'
assert args['data']['try_task_id'] == 'remoteTryTask'
assert args['data']['try_group_id'] == 'remoteTryGroup'
assert args['data']['monitoring_restart'] is False


def test_monitoring_restart(mock_try_config, mock_try_workflow):
'''
Test the Taskcluster indexing API and restart capabilities
'''
from static_analysis_bot.config import TaskCluster
from static_analysis_bot.revisions import Revision
mock_try_config.taskcluster = TaskCluster('/tmp/dummy', 'someTaskId', 0, False)
mock_try_workflow.index_service = mock.Mock()
rev = Revision()
rev.as_dict = dict
rev.namespaces = []

# Unsupported error code
mock_try_workflow.index(rev, test='dummy', error_code='nope', state='error')
assert mock_try_workflow.index_service.insertTask.call_count == 1
calls = mock_try_workflow.index_service.insertTask.call_args_list
namespace, args = calls[0][0]
assert namespace == 'project.releng.services.tasks.someTaskId'
assert args['taskId'] == 'someTaskId'
assert args['data']['monitoring_restart'] is False

# watchdog should be restated
mock_try_workflow.index(rev, test='dummy', error_code='watchdog', state='error')
assert mock_try_workflow.index_service.insertTask.call_count == 2
calls = mock_try_workflow.index_service.insertTask.call_args_list
namespace, args = calls[1][0]
assert namespace == 'project.releng.services.tasks.someTaskId'
assert args['taskId'] == 'someTaskId'
assert args['data']['monitoring_restart'] is True

# Invalid state
mock_try_workflow.index(rev, test='dummy', state='running')
assert mock_try_workflow.index_service.insertTask.call_count == 3
calls = mock_try_workflow.index_service.insertTask.call_args_list
namespace, args = calls[2][0]
assert namespace == 'project.releng.services.tasks.someTaskId'
assert args['taskId'] == 'someTaskId'
assert args['data']['monitoring_restart'] is False

0 comments on commit b32b4e1

Please sign in to comment.