美好的一天,我有一个问题,使用 Scala 将作业上传到 Flink API 时遇到了问题
所有获取请求似乎都有效
import scalaj.http._
val url: String = "http://127.0.0.1:8081"
val response: HttpResponse[String] = Http(url+"/config").asString
return response
当我尝试通过 CURL 上传 JAR 文件时(有效)
curl -vvv -X POST -H "Expect:" -F "jarfile=@/home/Downloads/myJob.jar" http://127.0.0.1:8081/jars/upload
现在我想使用 SCALA 上传该文档没有提供工作示例,我对这种类型的帖子相当陌生:https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#submitting-programs
目前我的代码是(不起作用):取自 : https://github.com/Guru107/flinkjobuploadplugin/tree/master/src/main/java/com/github/guru107 - 根据我的需要编辑
// Ideal Case is to upload a Jar File as a multipart in Scala
import java.io.IOException
import org.apache.http.client.methods.HttpPost
import org.apache.http.entity.mime.MultipartEntityBuilder
import org.apache.http.impl.client.{HttpClients, LaxRedirectStrategy}
import org.apache.http.message.BasicHeader
import org.apache.http.util.EntityUtils
val requestUrl = "http://localhost:8081/jars/upload"
val jarPath = "@/home/Downloads/myJob.jar"
val httpClient: CloseableHttpClient = HttpClients.custom.setRedirectStrategy(new LaxRedirectStrategy).build
val fileToUpload: File = new File(jarPath)
val uploadFileUrl: HttpPost = new HttpPost(requestUrl)
val builder: MultipartEntityBuilder = MultipartEntityBuilder.create
builder.addBinaryBody("jarfile", fileToUpload)
val multipart: HttpEntity = builder.build
var jobUploadResponse: JSONObject = null
uploadFileUrl.setEntity(multipart)
var response: CloseableHttpResponse = null
try {
response = httpClient.execute(uploadFileUrl)
println("response: " + response)
response.setHeader(new BasicHeader("Expect", ""))
response.setHeader(new BasicHeader("content-type", "application/x-java-archive"))
val bodyAsString = EntityUtils.toString(response.getEntity, "UTF-8")
println("bodyAsString: " + bodyAsString)
jobUploadResponse = new JSONObject(bodyAsString)
println("jobUploadResponse: " + jobUploadResponse)
}
它无法上传文件。
请提供一个工作示例或 scala 示例的链接,以上传作业/jar 文件以在 scala 中
链接提前致谢
您可以使用 com.github.mjreid.flinkwrapper
中的客户端代码
并上传带有 scala 代码的 jar 文件:
val apiEndpoint: String = as.settings.config.getString("flink.url") //http://<flink_web_host>:<flink_web_port>
val client = FlinkRestClient(apiEndpoint, as)
client.runProgram(<jarId>)