diff --git a/streamingpro-mlsql/src/main/java/tech/mlsql/datasource/impl/MLSQLRest.scala b/streamingpro-mlsql/src/main/java/tech/mlsql/datasource/impl/MLSQLRest.scala index a9848144e..e0ee608d8 100644 --- a/streamingpro-mlsql/src/main/java/tech/mlsql/datasource/impl/MLSQLRest.scala +++ b/streamingpro-mlsql/src/main/java/tech/mlsql/datasource/impl/MLSQLRest.scala @@ -336,25 +336,44 @@ class MLSQLRest(override val uid: String) extends MLSQLSource request.bodyForm(form.build(), Charset.forName("utf-8")).execute() case ("post", contentType) if contentType.trim.startsWith("multipart/form-data") => + // Avoid setting boundary + request.removeHeaders("Content-Type") val context = ScriptSQLExec.contextGetOrForTest() val _filePath = params("form.file-path") val finalPath = resourceRealPath(context.execListener, Option(context.owner), _filePath) - val inputStream = HDFSOperatorV2.readAsInputStream(finalPath) - val fileName = params("form.file-name") + val filePathBuf = ArrayBuffer[(String, String)]() + HDFSOperatorV2.isFile(finalPath) match { + case true => + filePathBuf.append((finalPath, fileName)) + case false if HDFSOperatorV2.isDir(finalPath) => + val listFiles = HDFSOperatorV2.listFiles(finalPath) + if(listFiles.filter(_.isDirectory).size > 0) throw new MLSQLException(s"Including subdirectories is not supported") + + listFiles.filterNot(_.getPath.getName.equals("_SUCCESS")) + .foreach(file => filePathBuf.append((file.getPath.toString, file.getPath.getName))) + case _ => + throw new MLSQLException(s"Please check whether the specified directory or file exists") + } + val entity = MultipartEntityBuilder.create. setMode(HttpMultipartMode.BROWSER_COMPATIBLE). - setCharset(Charset.forName("utf-8")). - addBinaryBody(fileName, inputStream, ContentType.MULTIPART_FORM_DATA, fileName) + setCharset(Charset.forName("utf-8")) + + filePathBuf.map(fileInfo => { + val inputStream = HDFSOperatorV2.readAsInputStream(fileInfo._1) + entity.addBinaryBody(fileInfo._2, inputStream, ContentType.MULTIPART_FORM_DATA, fileInfo._2) + }) params.filter(_._1.startsWith("form.")). filter(v => v._1 != "form.file-path" && v._1 != "form.file-name").foreach { case (k, v) => entity.addTextBody(k.stripPrefix("form."), Templates2.dynamicEvaluateExpression(v, ScriptSQLExec.context().execListener.env().toMap)) } request.body(entity.build()).execute() + case (_, v) => throw new MLSQLException(s"content-type ${v} is not support yet") }