Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support callback header #1808

Merged
merged 2 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import net.csdn.common.path.Url
import net.csdn.modules.transport.HttpTransportService.SResponse
import net.csdn.modules.transport.{DefaultHttpTransportService, HttpTransportService}
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.http.{HttpEntity, HttpResponse}
import org.apache.http.client.entity.UrlEncodedFormEntity
import org.apache.http.client.fluent.{Form, Request}
import org.apache.http.entity.ContentType
import org.apache.http.entity.mime.{HttpMultipartMode, MultipartEntityBuilder}
import org.apache.http.message.BasicNameValuePair
import org.apache.http.util.EntityUtils
import org.apache.http.{HttpEntity, HttpResponse}
import streaming.dsl.ScriptSQLExec
import streaming.log.WowLog
import tech.mlsql.common.JsonUtils
Expand All @@ -22,17 +22,21 @@ import tech.mlsql.tool.{HDFSOperatorV2, Templates2}
import java.nio.charset.Charset
import scala.annotation.tailrec
import scala.collection.JavaConversions._
import scala.util.control.Breaks.{break, breakable}

object RestUtils extends Logging with WowLog {
def httpClientPost(urlString: String, data: Map[String, String]): HttpResponse = {
def httpClientPost(urlString: String, data: Map[String, String], headers: Map[String, String]): HttpResponse = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR comment 带上issue 号吧

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是说commit吗

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment里面有带

val nameValuePairs = data.map { case (name, value) =>
new BasicNameValuePair(name, value)
}.toList

Request.Post(urlString)
val req = Request.Post(urlString)
.addHeader("Content-Type", "application/x-www-form-urlencoded")
.body(new UrlEncodedFormEntity(nameValuePairs, DefaultHttpTransportService.charset))

headers foreach { case (name, value) =>
req.setHeader(name, value)
}

req.body(new UrlEncodedFormEntity(nameValuePairs, DefaultHttpTransportService.charset))
.execute()
.returnResponse()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package tech.mlsql.it

import net.csdn.modules.transport.DefaultHttpTransportService
import org.apache.http.HttpEntity
import org.apache.http.util.EntityUtils
import tech.mlsql.common.utils.log.Logging
import tech.mlsql.crawler.RestUtils
import tech.mlsql.it.contiainer.ByzerCluster
Expand All @@ -8,6 +11,7 @@ import tech.mlsql.it.utils.DockerUtils.getCurProjectRootPath

import java.io.File
import java.util.UUID
import scala.collection.mutable

/**
* 23/02/2022 hellozepp(lisheng.zhanglin@163.com)
Expand Down Expand Up @@ -54,11 +58,27 @@ class ByzerScriptTestSuite extends LocalBaseTestSuite with Logging {
})
}

def runScript(url: String, user: String, code: String): (Int, String) = {
def runScript(url: String, user: String, code: String, callbackHeader: String = ""): (Int, String) = {
val jobName = UUID.randomUUID().toString
val params = mutable.Map("sql" -> code, "owner" -> user,
"jobName" -> jobName, "sessionPerUser" -> "true", "sessionPerRequest" -> "true")
if (callbackHeader != "") params.put("callbackHeader", callbackHeader)
logInfo(s"The test submits a script to the container through Rest, url:$url, sql:$code")
val (status, result) = RestUtils.rest_request_string(url, "post", Map("sql" -> code, "owner" -> user,
"jobName" -> jobName, "sessionPerUser" -> "true", "sessionPerRequest" -> "true"),
val (status, result) = RestUtils.rest_request_string(url, "post", params.toMap,
Map("Content-Type" -> "application/x-www-form-urlencoded"), Map("socket-timeout" -> "1800s",
"connect-timeout" -> "1800s", "retry" -> "1")
)
logInfo(s"status:$status,result:$result")
(status, result)
}

def runScriptWithHeader(url: String, user: String, code: String, callbackHeader: String = ""): (Int, HttpEntity) = {
val jobName = UUID.randomUUID().toString
val params = mutable.Map("sql" -> code, "owner" -> user,
"jobName" -> jobName, "sessionPerUser" -> "true", "sessionPerRequest" -> "true")
if (callbackHeader != "") params.put("callbackHeader", callbackHeader)
logInfo(s"The test submits a script to the container through Rest, url:$url, sql:$code")
val (status, result) = RestUtils.rest_request(url, "post", params.toMap,
Map("Content-Type" -> "application/x-www-form-urlencoded"), Map("socket-timeout" -> "1800s",
"connect-timeout" -> "1800s", "retry" -> "1")
)
Expand Down Expand Up @@ -101,6 +121,18 @@ class ByzerScriptTestSuite extends LocalBaseTestSuite with Logging {
}

test("Execute yarn sql file") {
try {
val (_, result) = runScriptWithHeader(url, user, "select 1 as a,'jack' as b as bbc;",
"""{"Authorization":"Bearer acc"}""")
val _result = EntityUtils.toString(result, DefaultHttpTransportService.charset)
println("With callbackHeader result:" + _result)
assert(_result === "[{\"a\":1,\"b\":\"jack\"}]")
} catch {
case _: Exception =>
logError(s"callbackHeader should be returned normally in the byzer callback!")
System.exit(1)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

System.exit(1) ? 这里无法assert吗?或者直接抛错?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已修复

}

TestManager.testCases.foreach(testCase => {
try {
val (status, result) = runScript(url, user, testCase.sql)
Expand All @@ -110,7 +142,6 @@ class ByzerScriptTestSuite extends LocalBaseTestSuite with Logging {
TestManager.acceptRest(testCase, 500, null, e)
}
})

TestManager.report()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.spark.sql.mlsql.session.{MLSQLSparkSession, SparkSessionCacheM
import org.apache.spark.{MLSQLConf, SparkInstanceService}
import tech.mlsql.MLSQLEnvKey
import tech.mlsql.app.{CustomController, ResultResp}
import tech.mlsql.common.JsonUtils
import tech.mlsql.common.utils.log.Logging
import tech.mlsql.common.utils.serder.json.JSONTool
import tech.mlsql.crawler.RestUtils
Expand Down Expand Up @@ -105,6 +106,7 @@ class RestController extends ApplicationController with WowLog with Logging {
new Parameter(name = "sessionPerRequest", required = false, description = "by default false", `type` = "boolean", allowEmptyValue = false),
new Parameter(name = "async", required = false, description = "If set true ,please also provide a callback url use `callback` parameter and the job will run in background and the API will return. default: false", `type` = "boolean", allowEmptyValue = false),
new Parameter(name = "callback", required = false, description = "Used when async is set true. callback is a url. default: false", `type` = "string", allowEmptyValue = false),
new Parameter(name = "callbackHeader", required = false, description = "Provide a jsonString parameter to set the header parameter of the callback request. default: false", `type` = "string", allowEmptyValue = false),
new Parameter(name = "maxRetries", required = false, description = "Max retries of request callback.", `type` = "int", allowEmptyValue = false),
new Parameter(name = "skipInclude", required = false, description = "disable include statement. default: false", `type` = "boolean", allowEmptyValue = false),
new Parameter(name = "skipAuth", required = false, description = "disable table authorize . default: true", `type` = "boolean", allowEmptyValue = false),
Expand Down Expand Up @@ -147,6 +149,12 @@ class RestController extends ApplicationController with WowLog with Logging {
if (paramAsBoolean("async", false)) {
JobManager.asyncRun(sparkSession, jobInfo, () => {
val urlString = param("callback")
val callbackHeaderString = param("callbackHeader")
var callbackHeader = Map[String,String]()
if (callbackHeaderString != null && callbackHeaderString.nonEmpty){
callbackHeader = JsonUtils.fromJson[Map[String,String]](callbackHeaderString)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里fromJson异常的情况是要block run/script的接口流程吗? 还是要catch住,留个容错模式

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

应该抛出,callbackHeader设置的不对会影响后续的callback

}

val maxTries = Math.max(0, paramAsInt("maxRetries", -1)) + 1
try {
ScriptSQLExec.parse(param("sql"), context,
Expand All @@ -161,7 +169,8 @@ class RestController extends ApplicationController with WowLog with Logging {
RestUtils.httpClientPost(urlString,
Map("stat" -> s"""succeeded""",
"res" -> outputResult,
"jobInfo" -> JSONTool.toJsonStr(jobInfo))),
"jobInfo" -> JSONTool.toJsonStr(jobInfo)),
callbackHeader),
HttpStatus.SC_OK == _.getStatusLine.getStatusCode,
response => logger.error(s"Succeeded SQL callback request failed after ${maxTries} attempts, " +
s"the last response status is: ${response.getStatusLine.getStatusCode}.")
Expand All @@ -178,7 +187,8 @@ class RestController extends ApplicationController with WowLog with Logging {
RestUtils.httpClientPost(urlString,
Map("stat" -> s"""failed""",
"msg" -> (e.getMessage + "\n" + msgBuffer.mkString("\n")),
"jobInfo" -> JSONTool.toJsonStr(jobInfo))),
"jobInfo" -> JSONTool.toJsonStr(jobInfo)),
Map()),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的header为什么是个empty map?

HttpStatus.SC_OK == _.getStatusLine.getStatusCode,
response => logger.error(s"Fail SQL callback request failed after ${maxTries} attempts, " +
s"the last response status is: ${response.getStatusLine.getStatusCode}.")
Expand Down