apache flink-异步刷新一个哈希图



我正在使用Scala API开发Apache Flink应用程序(我使用此技术很新)。

我正在使用hashmap来存储来自数据库的一些值,我需要每个1H刷新这些值。有什么方法可以刷新此hashmap异步?

谢谢!

我不确定您的意思是"刷新此hashmap异步"在flink工作流程中。

对于它的价值,如果您有一个由记录中流过您工作流的记录中的数据所占据的hashmap,那么您可以使用Flink对托管密钥状态的支持来存储该值(并进行检查点),然后做到这一点可查询。

我将您的问题解释为意味着您正在使用Flink中的某个状态来镜像/缓存来自外部数据库的某些数据,您希望定期刷新它。

通常,这种事情是通过将更改数据捕获(CDC)流从外部数据库连续流到flink来完成的。连续的流式解决方案通常更适合Flink。但是,如果您想在每小时批处理中进行此操作,则可以编写一个自定义源或一个每小时醒来一次的过程输出,对数据库进行查询,并发出一系列记录,该记录流可用于更新操作员持有操作员状态。

您可以通过使用Apache Flink的异步I/O用于外部数据访问来实现此目标,请参阅此帖子。

这是一种使用asyncdatastream的方法,通过创建异步函数并将其附加到源流中,以定期刷新地图。

class AsyncEnricherFunction extends RichAsyncFunction[String, (String String)] {
  @transient private var m: Map[String, String] = _
  @transient private var client: DataBaseClient = _
  @transient private var refreshInterval: Int = _
@throws(classOf[Exception])
  override def open(parameters: Configuration): Unit = {
    client = new DataBaseClient(host, port, credentials)
    refreshInterval = 1000
    load()
  }
  private def load(): Unit = {
    val str = "select key, value from KeyValue"
    m = client.query(str).asMap    
    lastRefreshed = System.currentTimeMillis()
  }
 override def asyncInvoke(input: String, resultFuture: ResultFuture[(String, String]): Unit = {
    Future {
      if (System.currentTimeMillis() > lastRefreshed + refreshInterval) load()      
      val enriched = (input, m(input))
      resultFuture.complete(Seq(enriched))
    }(ExecutionContext.global)
  }
  override def close() : Unit = { client.close() }
}
 val in: DataStream[String] = env.addSource(src)
 val enriched = AsyncDataStream.unorderedWait(in, AsyncEnricherFunction(), 5000, TimeUnit.MILLISECONDS, 100)

最新更新