如何查询 flink 的可查询状态



我正在使用 flink 1.8.0,我正在尝试查询我的作业状态。

val descriptor = new ValueStateDescriptor("myState", Types.CASE_CLASS[Foo])
descriptor.setQueryable("my-queryable-State")

我使用了端口 9067,这是默认端口,根据这个,我的客户端:

val client = new QueryableStateClient("127.0.0.1", 9067)
val jobId = JobID.fromHexString("d48a6c980d1a147e0622565700158d9e")
val execConfig = new ExecutionConfig
val descriptor = new ValueStateDescriptor("my-queryable-State", Types.CASE_CLASS[Foo])
val res: Future[ValueState[Foo]] = client.getKvState(jobId, "my-queryable-State","a", BasicTypeInfo.STRING_TYPE_INFO, descriptor)
res.map(_.toString).pipeTo(sender)

但我得到:

[ERROR] [06/25/2019 20:37:05.499] [bvAkkaHttpServer-akka.actor.default-dispatcher-5] [akka.actor.ActorSystemImpl(bvAkkaHttpServer)] Error during processing of request: 'org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /127.0.0.1:9067'. Completing with 500 Internal Server Error response. To change default exception handling behavior, provide a custom ExceptionHandler.
java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /127.0.0.1:9067
  1. 我做错了什么?
  2. 我应该如何以及在何处定义QueryableStateOptions

所以如果你想使用QueryableState你需要将适当的 Jar 添加到你的 flink 中。jar 是flink-queryable-state-runtime的,它可以在 flink 发行版的opt文件夹中找到,您应该将其移动到lib文件夹中。

至于第二个问题,QueryableStateOption只是一个用于创建静态ConfigOption定义的类。然后使用这些定义从文件中读取配置flink-conf.yaml。所以目前配置QueryableState的唯一选择是在 flink 发行版中使用 flink-conf 文件。

编辑:另外,尝试阅读此内容]1,它提供了有关可查询状态如何工作的更多信息。您不应该真正直接连接到服务器端口,而应该使用默认情况下为9069的代理端口。

相关内容

  • 没有找到相关文章

最新更新