我们具有异步函数。异步操作是使用akka http client
完成的。class Foo[A,B] extends AsyncFunction[A, B] with {
val akkaConfig = ConfigFactory.load()
implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
implicit lazy val system = ActorSystem("MyActorSystem", akkaConfig)
implicit lazy val materializer = ActorMaterializer()
def postReq(uriStr: String, str: String): Future[HttpResponse] = {
Http().singleRequest(HttpRequest(
method = HttpMethods.POST,
uri = uriStr,
entity = HttpEntity(ContentTypes.`application/json`, str))
)
}
override def asyncInvoke(input: A, resultFuture: ResultFuture[B]) : Unit = {
val resultFutureRequested: Future[HttpResponse] = postReq(...)
//the rest of the class ...
问题:
- 如果我想增加HTTP请求的并行性 - 是否应该使用Akka config进行,或者有一种方法可以通过Flink.yamel 进行配置。
- 由于Flink也使用Akka,这是创建
ActorSystem
和ExecutionContext
的正确方法吗?
至于第一个问题,您有三个不同的设置可以影响性能和执行的实际请求数:
- 并行性,这将导致Flink创建
AsyncFunction
的多个实例,包括HttpClient
的多个实例。 - 函数本身中并发请求的数量。当您调用
orderedWait
或unorderedWait
时,您应该在功能中提供capacity
,这将限制并发请求的数量。 - 您的HTTP客户端的实际设置。
您可以看到,点2.和3.已连接,因为flink可以限制可能的并发请求的数量,因此有时您的HTTP客户端设置中的更改可能没有效果,因为请求的数量是由flink int自己界定。
增加AsyncFunction
的吞吐量取决于情况。您需要记住,AsyncFunction
是单线程中的callend。这基本上意味着,如果您要呼叫的服务的响应时间很大,您将只会阻止等待响应的请求的数量,因此唯一的方法是增加parallelism'
。但是,通常,更改HttpClient
和函数capacity
的设置应允许您获得更好的吞吐量。
至于第二个问题,我看不到创建多个ActorSystems
的问题。您可以看到回答的类似问题[在此] .1