-
Notifications
You must be signed in to change notification settings - Fork 148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Correctly Set query status #2227
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -63,34 +63,38 @@ public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest) | |
} | ||
} | ||
|
||
// TODO : Fetch from Result Index and then make call to EMR Serverless. | ||
public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata) { | ||
GetJobRunResult getJobRunResult = | ||
emrServerlessClient.getJobRunResult( | ||
asyncQueryJobMetadata.getApplicationId(), asyncQueryJobMetadata.getJobId()); | ||
String jobState = getJobRunResult.getJobRun().getState(); | ||
// either empty json when the result is not available or data with status | ||
// Fetch from Result Index | ||
JSONObject result = | ||
(jobState.equals(JobRunState.SUCCESS.toString())) | ||
? jobExecutionResponseReader.getResultFromOpensearchIndex( | ||
asyncQueryJobMetadata.getJobId(), asyncQueryJobMetadata.getResultIndex()) | ||
: new JSONObject(); | ||
jobExecutionResponseReader.getResultFromOpensearchIndex( | ||
asyncQueryJobMetadata.getJobId(), asyncQueryJobMetadata.getResultIndex()); | ||
|
||
// if result index document has a status, we are gonna use the status directly; otherwise, we | ||
// will use emr-s job status | ||
// a job is successful does not mean there is no error in execution. For example, even if result | ||
// index mapping | ||
// is incorrect, we still write query result and let the job finish. | ||
// will use emr-s job status. | ||
// That a job is successful does not mean there is no error in execution. For example, even if | ||
// result | ||
// index mapping is incorrect, we still write query result and let the job finish. | ||
// That a job is running does not mean the status is running. For example, index/streaming Query | ||
// is a | ||
// long-running job which runs forever. But we need to return success from the result index | ||
// immediately. | ||
if (result.has(DATA_FIELD)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we check There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as we discussed, we can assume status exist within data |
||
JSONObject items = result.getJSONObject(DATA_FIELD); | ||
|
||
// If items have STATUS_FIELD, use it; otherwise, use jobState | ||
String status = items.optString(STATUS_FIELD, jobState); | ||
// If items have STATUS_FIELD, use it; otherwise, mark failed | ||
String status = items.optString(STATUS_FIELD, JobRunState.FAILED.toString()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in which case, result has data but does not has status field? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. theoretically it is not possible. I provide a default here in case we get exception when it is not there. |
||
result.put(STATUS_FIELD, status); | ||
|
||
// If items have ERROR_FIELD, use it; otherwise, set empty string | ||
String error = items.optString(ERROR_FIELD, ""); | ||
result.put(ERROR_FIELD, error); | ||
} else { | ||
// make call to EMR Serverless when related result index documents are not available | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice! it is help reduce query latency. |
||
GetJobRunResult getJobRunResult = | ||
emrServerlessClient.getJobRunResult( | ||
asyncQueryJobMetadata.getApplicationId(), asyncQueryJobMetadata.getJobId()); | ||
String jobState = getJobRunResult.getJobRun().getState(); | ||
result.put(STATUS_FIELD, jobState); | ||
result.put(ERROR_FIELD, ""); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be below introduction right. Can we move this to Glue Connector Properties in DataSource Configuration Section.