Skip to content

Commit

Permalink
Implement task progress HTTP polling (#543)
Browse files Browse the repository at this point in the history
If the web socket is closed, task progress messages are lost. If the
"completed" or "aborted" message is among lost messages a progress
notifications remains freezed.

Add a setTimeout/clearTimeout invocation for each WS message received.
After 8 seconds, if no further message comes, send an HTTP GET request
with getTaskStatus() to verify the task status from API server.

If the HTTP request fails, try again after 8 seconds. If it succeeds
generate a synthesized progress message that completes the task
progress at 100%.
  • Loading branch information
DavidePrincipi authored Dec 27, 2023
1 parent 6497256 commit fe17db5
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 1 deletion.
55 changes: 54 additions & 1 deletion core/ui/src/mixins/notification.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export default {
name: "NotificationService",
mixins: [UtilService, TaskService],
computed: {
...mapState(["notifications"]),
...mapState(["notifications", "taskPollingTimers"]),
...mapGetters(["getNotificationById", "getTaskById"]),
},
methods: {
Expand All @@ -22,6 +22,7 @@ export default {
"setNotificationDrawerShownInStore",
"setNotificationReadInStore",
"deleteNotificationInStore",
"setPollingTimerForTaskInStore",
]),
createNotification(notification) {
// fill missing attributes
Expand Down Expand Up @@ -376,6 +377,58 @@ export default {
},
};

// POLLING: clear the task timer because a message has been
// received:
if (this.taskPollingTimers[taskId]) {
clearTimeout(this.taskPollingTimers[taskId]);
this.setPollingTimerForTaskInStore({
taskId: taskId,
timeoutId: undefined,
});
}

// POLLING: set the task timer for running tasks
if (taskStatus == "running") {
// The poll period is two times the agent task heartbeat period:
// in normal conditions it never triggers its handler function.
let taskProgressPollPeriod = 8100;
let timeoutId = setTimeout(async () => {
// POLLING: if the task heartbeat is lost and the poll period
// is elapsed, we send an HTTP request to check the task
// status. Typical scenario: the api-server was restarted and
// some websocket messages were lost.
const [err, statusResponse] = await to(
this.getTaskStatus(taskPath)
);
if (err) {
// If the task status is not found (404), run the message
// handler again to activate the timer one more time:
this.handleProgressTaskMessage(taskPath, taskId, payload);
return;
}
let myPayload = payload; // initialized with a fallback payload
if (statusResponse?.data?.data) {
// The task status is consistent: use it to synthesize a
// message payload that reflects the completed task status
let exitCode = statusResponse.data.data["exit_code"];
myPayload = {
progress: 100,
status: exitCode == 0 ? "completed" : "aborted",
};
}
// Run again the message handler with our synthesized payload
this.handleProgressTaskMessage(taskPath, taskId, myPayload);
}, taskProgressPollPeriod);

// POLLING: store the task timer to retrieve it later. In normal
// conditions a new message is received and the task timer is
// cleared before its period is elapsed.
this.setPollingTimerForTaskInStore({
taskId: taskId,
timeoutId: timeoutId,
});
}

if (taskResult) {
notification.task.result = taskResult;

Expand Down
7 changes: 7 additions & 0 deletions core/ui/src/store/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Vue.use(Vuex);
export default new Vuex.Store({
state: {
notifications: [],
taskPollingTimers: {},
isNotificationDrawerShown: false,
isMobileSideMenuShown: false,
isAppDrawerShown: false,
Expand Down Expand Up @@ -92,6 +93,9 @@ export default new Vuex.Store({
},
},
mutations: {
setPollingTimerForTask(state, obj) {
state.taskPollingTimers[obj.taskId] = obj.timeoutId;
},
createNotification(state, notification) {
state.notifications.unshift(notification);
},
Expand Down Expand Up @@ -193,6 +197,9 @@ export default new Vuex.Store({
},
},
actions: {
setPollingTimerForTaskInStore(context, obj) {
context.commit("setPollingTimerForTask", obj);
},
createNotificationInStore(context, notification) {
context.commit("createNotification", notification);
},
Expand Down

0 comments on commit fe17db5

Please sign in to comment.