diff --git a/src/pulselistener/pulselistener/monitoring.py b/src/pulselistener/pulselistener/monitoring.py index 68b63e8c98..6e63eb78b4 100644 --- a/src/pulselistener/pulselistener/monitoring.py +++ b/src/pulselistener/pulselistener/monitoring.py @@ -19,6 +19,7 @@ ''' TASK_MD = '* [{0}](https://tools.taskcluster.net/task-inspector/#{0})' +TASKCLUSTER_NAMESPACE = 'project.releng.services.tasks.{task_id}' class Monitoring(object): @@ -36,6 +37,7 @@ def __init__(self, period): # TC services self.notify = None self.queue = None + self.index = None # Setup monitoring queue self.tasks = asyncio.Queue() @@ -46,6 +48,7 @@ def connect_taskcluster(self, client_id=None, access_token=None): ''' self.notify = get_service('notify', client_id, access_token) self.queue = get_service('queue', client_id, access_token) + self.index = get_service('index', client_id, access_token) async def add_task(self, group_id, hook_id, task_id): ''' @@ -103,6 +106,11 @@ async def check_task(self): if task_status == 'exception': await self.retry_task(group_id, hook_id, task_id) + # Lookup the failed details + if task_status == 'failed' and self.is_restartable(task_id): + logger.info('Failed task is restartable', task_id=task_id) + await self.retry_task(group_id, hook_id, task_id) + # Add to report if hook_id not in self.stats: self.stats[hook_id] = {'failed': [], 'completed': [], 'exception': []} @@ -112,6 +120,22 @@ async def check_task(self): # Push back into queue so it get checked later on await self.tasks.put((group_id, hook_id, task_id)) + def is_restartable(self, task_id): + ''' + A task is restartable if its indexed state using task id + has a monitoring_restart field set to True + ''' + # Load the indexed data + task_path = TASKCLUSTER_NAMESPACE.format(task_id=task_id) + try: + index = self.index.findTask(task_path) + except Exception as e: + logger.info('Task not found in index', task=task_id, error=str(e)) + return False + + # Restart when monitoring_restart is set + return index['data'].get('monitoring_restart') is True + async def retry_task(self, group_id, hook_id, task_id): ''' Retry a Taskcluster task by: diff --git a/src/pulselistener/tests/conftest.py b/src/pulselistener/tests/conftest.py index 8b53a98eaf..6edca03c43 100644 --- a/src/pulselistener/tests/conftest.py +++ b/src/pulselistener/tests/conftest.py @@ -61,6 +61,9 @@ def task(self, task_id): 'requires': 'all-completed', 'retries': retry, 'scopes': [], + 'routes': [ + 'index.{}.latest'.format(task_id), + ], 'taskGroupId': 'group-{}'.format(task_id), 'workerType': 'niceWorker' } @@ -104,6 +107,27 @@ def triggerHook(self, group_id, hook_id, payload): return Mock() +@pytest.fixture +def IndexMock(): + class Mock(): + def __init__(self): + pass + + def findTask(self, path): + assert path.startswith('project.releng.services.tasks.') + failed = 'failed' in path + return { + 'taskId': path[30:], + 'data': { + 'state': failed and 'error' or 'done', + 'error_code': failed and 'somethingBad' or None, + 'monitoring_restart': (failed and 'restart' in path) + } + } + + return Mock() + + @pytest.fixture @contextmanager def PhabricatorMock(): diff --git a/src/pulselistener/tests/test_monitoring.py b/src/pulselistener/tests/test_monitoring.py index 842df2eb17..3bf6400d4a 100644 --- a/src/pulselistener/tests/test_monitoring.py +++ b/src/pulselistener/tests/test_monitoring.py @@ -245,3 +245,29 @@ async def test_monitoring_retry_exceptions(QueueMock, NotifyMock): assert monitoring.stats['Hook2']['exception'] == ['Task-exception-retry:0'] assert len(monitoring.queue.created_tasks) == 1 assert monitoring.tasks.qsize() == 1 + + +@pytest.mark.asyncio +async def test_monitoring_restartable(QueueMock, IndexMock): + monitoring = Monitoring(1) + + monitoring.index = IndexMock + monitoring.queue = QueueMock + + # Check a failed task is restartable + # when the monitoring flag is set + assert monitoring.is_restartable('Task-failed-restart') + + # Check a failed task is not restartable + # when the monitoring flag is not set + assert not monitoring.is_restartable('Task-failed-nope') + + # Check a completed task is not restartable + assert not monitoring.is_restartable('Task-completed-restart') + assert not monitoring.is_restartable('Task-completed-nope') + + # Check a failed task is restarted by the monitoring flow + assert len(monitoring.queue.created_tasks) == 0 + await monitoring.add_task('Group', 'Hook-staticanalysis/bot', 'Task-failed-restart') + await monitoring.check_task() + assert len(monitoring.queue.created_tasks) == 1 diff --git a/src/staticanalysis/bot/static_analysis_bot/workflows/base.py b/src/staticanalysis/bot/static_analysis_bot/workflows/base.py index 9203be72d4..fddc75fc5f 100644 --- a/src/staticanalysis/bot/static_analysis_bot/workflows/base.py +++ b/src/staticanalysis/bot/static_analysis_bot/workflows/base.py @@ -141,6 +141,10 @@ def index(self, revision, **kwargs): payload['try_task_id'] = settings.try_task_id payload['try_group_id'] = settings.try_group_id + # Add restartable flag for monitoring + payload['monitoring_restart'] = payload['state'] == 'error' and \ + payload.get('error_code') in ('watchdog', 'mercurial') + # Add a sub namespace with the task id to be able to list # tasks from the parent namespace namespaces = revision.namespaces + [ @@ -148,9 +152,15 @@ def index(self, revision, **kwargs): for namespace in revision.namespaces ] + # Build complete namespaces list, with monitoring update + full_namespaces = [ + TASKCLUSTER_NAMESPACE.format(channel=settings.app_channel, name=name) + for name in namespaces + ] + full_namespaces.append('project.releng.services.tasks.{}'.format(settings.taskcluster.task_id)) + # Index for all required namespaces - for name in namespaces: - namespace = TASKCLUSTER_NAMESPACE.format(channel=settings.app_channel, name=name) + for namespace in full_namespaces: self.index_service.insertTask( namespace, { diff --git a/src/staticanalysis/bot/tests/test_workflow.py b/src/staticanalysis/bot/tests/test_workflow.py index a2c4093eac..968625ec32 100644 --- a/src/staticanalysis/bot/tests/test_workflow.py +++ b/src/staticanalysis/bot/tests/test_workflow.py @@ -17,10 +17,10 @@ def test_taskcluster_index(mock_try_config, mock_try_workflow): mock_try_workflow.index_service = mock.Mock() rev = Revision() rev.namespaces = ['mock.1234'] - rev.as_dict = lambda: {'id': '1234', 'someData': 'mock'} + rev.as_dict = lambda: {'id': '1234', 'someData': 'mock', 'state': 'done', } mock_try_workflow.index(rev, test='dummy') - assert mock_try_workflow.index_service.insertTask.call_count == 2 + assert mock_try_workflow.index_service.insertTask.call_count == 3 calls = mock_try_workflow.index_service.insertTask.call_args_list # First call with namespace @@ -46,3 +46,54 @@ def test_taskcluster_index(mock_try_config, mock_try_workflow): assert args['data']['try_group_id'] == 'remoteTryGroup' assert args['data']['someData'] == 'mock' assert 'indexed' in args['data'] + + # Third call for monitoring + namespace, args = calls[2][0] + assert namespace == 'project.releng.services.tasks.12345deadbeef' + assert args['taskId'] == '12345deadbeef' + assert args['data']['test'] == 'dummy' + assert args['data']['id'] == '1234' + assert args['data']['source'] == 'try' + assert args['data']['try_task_id'] == 'remoteTryTask' + assert args['data']['try_group_id'] == 'remoteTryGroup' + assert args['data']['monitoring_restart'] is False + + +def test_monitoring_restart(mock_try_config, mock_try_workflow): + ''' + Test the Taskcluster indexing API and restart capabilities + ''' + from static_analysis_bot.config import TaskCluster + from static_analysis_bot.revisions import Revision + mock_try_config.taskcluster = TaskCluster('/tmp/dummy', 'someTaskId', 0, False) + mock_try_workflow.index_service = mock.Mock() + rev = Revision() + rev.as_dict = dict + rev.namespaces = [] + + # Unsupported error code + mock_try_workflow.index(rev, test='dummy', error_code='nope', state='error') + assert mock_try_workflow.index_service.insertTask.call_count == 1 + calls = mock_try_workflow.index_service.insertTask.call_args_list + namespace, args = calls[0][0] + assert namespace == 'project.releng.services.tasks.someTaskId' + assert args['taskId'] == 'someTaskId' + assert args['data']['monitoring_restart'] is False + + # watchdog should be restated + mock_try_workflow.index(rev, test='dummy', error_code='watchdog', state='error') + assert mock_try_workflow.index_service.insertTask.call_count == 2 + calls = mock_try_workflow.index_service.insertTask.call_args_list + namespace, args = calls[1][0] + assert namespace == 'project.releng.services.tasks.someTaskId' + assert args['taskId'] == 'someTaskId' + assert args['data']['monitoring_restart'] is True + + # Invalid state + mock_try_workflow.index(rev, test='dummy', state='running') + assert mock_try_workflow.index_service.insertTask.call_count == 3 + calls = mock_try_workflow.index_service.insertTask.call_args_list + namespace, args = calls[2][0] + assert namespace == 'project.releng.services.tasks.someTaskId' + assert args['taskId'] == 'someTaskId' + assert args['data']['monitoring_restart'] is False