我可以使用Flink状态来执行连接吗?



我正在评估Apache Flink流处理作为Apache Spark的替代/补充。我们通常用Spark解决的任务之一是数据充实。

。e,我有来自具有传感器ID的物联网传感器的数据流,我有一组传感器元数据。我想把输入流转换成传感器测量流+传感器元数据流。

在Spark我可以加入DStream与RDD。

case calss SensorValue(sensorId: Long, ...)
case class SensorMetadata(sensorId: Long, ...)
val sensorInput: DStream[SensorValue] = readEventsFromKafka()
val staticMetadata: RDD[(Long, SensorMetadata)] =
  spark.read.json(...).as[SensorMetadata]
 .map {s => (s.sensorId, s)}.rdd
val joined: DStream[(SensorValue, SensorMetadata)] = 
  sensorInput.map{s => (s.sensorId, s)}.transform { rdd: RDD[SensorValue] => 
  rdd.join(staticMetadata)
     .map { case (_, (s, m)) => (s, m) } // Get rid of nested tuple
}

我可以用Apache Flink做同样的技巧吗?我在这上面没有看到直接的API。我唯一的想法是使用有状态转换——我可以将元数据和传感器事件合并到一个流中,并使用Flink状态存储来存储元数据(伪代码):

val sensorInput: DataStream[SensorValue] = readEventsFromKafka()
val statisMetadata: DataStream[SensorMetadata] = readMetadataFromJson()
val result: DataStream[(SensorValue, SensorMetadata)] =
  sensorInput.keyBy("sensorId")
 .connect(staticMetadata.keyBy("sensorId"))
 .flatMap {new RichCoFlatMapFunction() {
   private val ValueState<SensorMetadata> md = _;
   override def open = ??? // initiate value state
   def flatMap1(s: SensorEvent, s: Collector(SensorEvent, SensorMetadata)) = 
      collector.collect(s, md.value) 
   def flatMap2(s: SensorMetadata, s: Collector[(SensorEvent, SensorMetadata)]) = 
   md.update(s)  
 }}

这个方法正确吗?当元数据不能放在一台机器上时,我可以在更大的规模下使用吗?

谢谢

使用CoFlatMapFunction进行连接是一种常见的方法。然而,它有一个明显的缺点。当输入的元组到达并且您无法控制首先使用哪个输入时,就调用该函数。因此,在开始时,您必须在元数据尚未完全读取时处理传感器事件。一种方法是缓冲一个输入的所有事件,直到另一个输入被使用。另一方面,CoFlatMapFunction方法的优点是可以动态更新元数据。在您的代码示例中,两个输入都以连接键为键。这意味着输入是分区的,每个任务槽处理不同的键集。因此,您的元数据可能比机器可以处理的大(如果您配置RocksDB状态后端,则状态可以持久化到磁盘,因此您甚至不受内存大小的限制)。

如果您要求在作业启动时必须存在所有元数据,并且如果元数据是静态的(它不会更改)并且足够小以适合一台机器,那么您还可以使用常规的FlatMapFunction并在open()方法中从文件加载元数据。与您的方法相反,这将是一个广播连接,其中每个任务槽在内存中都有完整的元数据。除了在使用事件数据时使所有元数据可用之外,这种方法的好处是不需要打乱事件数据,因为它可以在任何机器上连接。

相关内容

  • 没有找到相关文章

最新更新