弗林克增加异步操作的并行性



我们具有异步函数。异步操作是使用akka http client

完成的。
class Foo[A,B] extends AsyncFunction[A, B] with {
  val akkaConfig = ConfigFactory.load()
  implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
  implicit lazy val system = ActorSystem("MyActorSystem", akkaConfig)
  implicit lazy val materializer = ActorMaterializer()
    def postReq(uriStr: String, str: String): Future[HttpResponse] = {
        Http().singleRequest(HttpRequest(
          method = HttpMethods.POST,
          uri = uriStr,
          entity = HttpEntity(ContentTypes.`application/json`, str))
        )
      }
 override def asyncInvoke(input: A, resultFuture: ResultFuture[B]) : Unit  = {
    val resultFutureRequested: Future[HttpResponse] = postReq(...)
//the rest of the class ...

问题:

  1. 如果我想增加HTTP请求的并行性 - 是否应该使用Akka config进行,或者有一种方法可以通过Flink.yamel
  2. 进行配置。
  3. 由于Flink也使用Akka,这是创建ActorSystemExecutionContext的正确方法吗?

至于第一个问题,您有三个不同的设置可以影响性能和执行的实际请求数:

  1. 并行性,这将导致Flink创建AsyncFunction的多个实例,包括HttpClient的多个实例。
  2. 函数本身中并发请求的数量。当您调用orderedWaitunorderedWait时,您应该在功能中提供capacity,这将限制并发请求的数量。
  3. 您的HTTP客户端的实际设置。

您可以看到,点2.和3.已连接,因为flink可以限制可能的并发请求的数量,因此有时您的HTTP客户端设置中的更改可能没有效果,因为请求的数量是由flink int自己界定。

增加AsyncFunction的吞吐量取决于情况。您需要记住,AsyncFunction是单线程中的callend。这基本上意味着,如果您要呼叫的服务的响应时间很大,您将只会阻止等待响应的请求的数量,因此唯一的方法是增加parallelism'。但是,通常,更改HttpClient和函数capacity的设置应允许您获得更好的吞吐量。

至于第二个问题,我看不到创建多个ActorSystems的问题。您可以看到回答的类似问题[在此] .1

最新更新