From 20fd54f5f11fcba24be5aa66e9698e0a46e7b20c Mon Sep 17 00:00:00 2001 From: Davide Principi Date: Fri, 22 Dec 2023 17:29:03 +0100 Subject: [PATCH 1/2] Implement task progress HTTP polling If the web socket is closed task progress messages are lost. If the "completed" or "aborted" message is among lost messages a progress notifications remains freezed. Add a setTimeout/clearTimeout invocation for each WS message received. After 10 seconds, if no further message comes send an HTTP GET request with getTaskStatus() to verify the task status from API server. If the HTTP request fails, try again after 10 seconds. If it succeedes generate a synthesized progress message that completes the task progress at 100%. --- core/ui/src/mixins/notification.js | 37 +++++++++++++++++++++++++++++- core/ui/src/store/index.js | 7 ++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/core/ui/src/mixins/notification.js b/core/ui/src/mixins/notification.js index aa1e9dfbf..6dbb64a29 100644 --- a/core/ui/src/mixins/notification.js +++ b/core/ui/src/mixins/notification.js @@ -11,7 +11,7 @@ export default { name: "NotificationService", mixins: [UtilService, TaskService], computed: { - ...mapState(["notifications"]), + ...mapState(["notifications", "taskPollingTimers"]), ...mapGetters(["getNotificationById", "getTaskById"]), }, methods: { @@ -22,6 +22,7 @@ export default { "setNotificationDrawerShownInStore", "setNotificationReadInStore", "deleteNotificationInStore", + "setPollingTimerForTaskInStore", ]), createNotification(notification) { // fill missing attributes @@ -376,6 +377,40 @@ export default { }, }; + if (this.taskPollingTimers[taskId]) { + clearTimeout(this.taskPollingTimers[taskId]); + this.setPollingTimerForTaskInStore({ + taskId: taskId, + timeoutId: undefined, + }); + } + + if (taskStatus == "running") { + let timeoutId = setTimeout(async () => { + const [err, statusResponse] = await to( + this.getTaskStatus(taskPath) + ); + if (err) { + this.handleProgressTaskMessage(taskPath, taskId, payload); + return; + } + let myPayload = payload; + if (statusResponse?.data?.data) { + let exitCode = statusResponse.data.data["exit_code"]; + myPayload = { + progress: 100, + status: exitCode == 0 ? "completed" : "aborted", + }; + } + this.handleProgressTaskMessage(taskPath, taskId, myPayload); + }, 10000); + + this.setPollingTimerForTaskInStore({ + taskId: taskId, + timeoutId: timeoutId, + }); + } + if (taskResult) { notification.task.result = taskResult; diff --git a/core/ui/src/store/index.js b/core/ui/src/store/index.js index cb03b2300..9a216cc9d 100644 --- a/core/ui/src/store/index.js +++ b/core/ui/src/store/index.js @@ -7,6 +7,7 @@ Vue.use(Vuex); export default new Vuex.Store({ state: { notifications: [], + taskPollingTimers: {}, isNotificationDrawerShown: false, isMobileSideMenuShown: false, isAppDrawerShown: false, @@ -92,6 +93,9 @@ export default new Vuex.Store({ }, }, mutations: { + setPollingTimerForTask(state, obj) { + state.taskPollingTimers[obj.taskId] = obj.timeoutId; + }, createNotification(state, notification) { state.notifications.unshift(notification); }, @@ -193,6 +197,9 @@ export default new Vuex.Store({ }, }, actions: { + setPollingTimerForTaskInStore(context, obj) { + context.commit("setPollingTimerForTask", obj); + }, createNotificationInStore(context, notification) { context.commit("createNotification", notification); }, From bc84d97c3e815e5ba942cb52a1e489756ecaff03 Mon Sep 17 00:00:00 2001 From: Davide Principi Date: Wed, 27 Dec 2023 10:44:30 +0100 Subject: [PATCH 2/2] fixup! Implement task progress HTTP polling --- core/ui/src/mixins/notification.js | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/core/ui/src/mixins/notification.js b/core/ui/src/mixins/notification.js index 6dbb64a29..c313d60e8 100644 --- a/core/ui/src/mixins/notification.js +++ b/core/ui/src/mixins/notification.js @@ -377,6 +377,8 @@ export default { }, }; + // POLLING: clear the task timer because a message has been + // received: if (this.taskPollingTimers[taskId]) { clearTimeout(this.taskPollingTimers[taskId]); this.setPollingTimerForTaskInStore({ @@ -385,26 +387,42 @@ export default { }); } + // POLLING: set the task timer for running tasks if (taskStatus == "running") { + // The poll period is two times the agent task heartbeat period: + // in normal conditions it never triggers its handler function. + let taskProgressPollPeriod = 8100; let timeoutId = setTimeout(async () => { + // POLLING: if the task heartbeat is lost and the poll period + // is elapsed, we send an HTTP request to check the task + // status. Typical scenario: the api-server was restarted and + // some websocket messages were lost. const [err, statusResponse] = await to( this.getTaskStatus(taskPath) ); if (err) { + // If the task status is not found (404), run the message + // handler again to activate the timer one more time: this.handleProgressTaskMessage(taskPath, taskId, payload); return; } - let myPayload = payload; + let myPayload = payload; // initialized with a fallback payload if (statusResponse?.data?.data) { + // The task status is consistent: use it to synthesize a + // message payload that reflects the completed task status let exitCode = statusResponse.data.data["exit_code"]; myPayload = { progress: 100, status: exitCode == 0 ? "completed" : "aborted", }; } + // Run again the message handler with our synthesized payload this.handleProgressTaskMessage(taskPath, taskId, myPayload); - }, 10000); + }, taskProgressPollPeriod); + // POLLING: store the task timer to retrieve it later. In normal + // conditions a new message is received and the task timer is + // cleared before its period is elapsed. this.setPollingTimerForTaskInStore({ taskId: taskId, timeoutId: timeoutId,