RichParallelSourceFunction中的水印



我正在实现一个SourceFunction,它从数据库中读取数据。 如果停止或压缩(即保存点和检查点(,并且数据只处理一次,则作业应该能够恢复。

到目前为止,我拥有的:

@SerialVersionUID(1L)
class JDBCSource(private val waitTimeMs: Long) extends 
RichParallelSourceFunction[Event] with StoppableFunction with LazyLogging{
@transient var client: PostGreClient = _
@volatile var isRunning: Boolean = true
val DEFAULT_WAIT_TIME_MS = 1000
def this(clientConfig: Serializable) =
this(clientConfig, DEFAULT_WAIT_TIME_MS)
override def stop(): Unit = {
this.isRunning = false
}
override def open(parameters: Configuration): Unit = {
super.open(parameters)
client = new JDBCClient
}
override def run(ctx: SourceFunction.SourceContext[Event]): Unit = {
while (isRunning){
val statement = client.getConnection.createStatement()
val resultSet = statement.executeQuery("SELECT name, timestamp FROM MYTABLE")
while (resultSet.next()) {
val event: String = resultSet.getString("name")
val timestamp: Long = resultSet.getLong("timestamp")
ctx.collectWithTimestamp(new Event(name, timestamp), timestamp)
}
}
}
override def cancel(): Unit = {
isRunning = false
}
}

如何确保仅获取尚未处理的数据库行? 我假设ctx变量将包含有关当前水印的一些信息,以便我可以将查询更改为以下内容:

select name, timestamp from myTable where timestamp > ctx.getCurrentWaterMark

但它对我来说没有任何相关的方法。任何如何解决这个问题的想法将不胜感激

您必须实现 CheckpointedFunction,以便您可以自己管理检查点。该接口的文档非常全面,但是如果您需要示例,我建议您查看示例。

本质上,你的函数必须实现CheckpointedFunction#snapshotState来存储你需要的状态,使用 Flink 的托管状态,然后,在执行恢复时,它会在CheckpointedFunction#initializeState中读取相同的状态。

相关内容

  • 没有找到相关文章

最新更新