Spark 结构化流式处理 - 将静态数据集与流式处理数据集联接



我正在使用Spark structured streaming来处理从Kafka读取的记录。这是我想要实现的目标:

(a) 每条记录都是(Timestamp, DeviceId)Tuple2

(b) 我创建了一个静态Dataset[DeviceId]其中包含预期在Kafka流中看到的所有有效设备 ID(类型DeviceId)的集合。

(c) 我需要编写一个Spark structured streaming查询

(i) Groups records by their timestamp into 5-minute windows
(ii) For each window, get the list of valid device IDs that were **not** seen in that window

例如,假设所有有效设备 ID 的列表[A,B,C,D,E],并且某个 5 分钟窗口中的 kafka 记录包含设备 ID[A,B,E]。然后,对于该窗口,我正在寻找的看不见的设备 ID 列表是[C,D].

问题

  1. 如何在 Spark 结构化流式处理中编写此查询?我尝试使用Dataset暴露的except()join()方法。但是,他们都抛出了一个运行时异常,抱怨streaming Dataset不支持这些操作。

这是我的代码片段:

val validDeviceIds: Dataset[(DeviceId, Long)] = spark.createDataset[DeviceId](listOfAllDeviceIds.map(id => (id, 0L))) 
case class KafkaRecord(timestamp: TimestampType, deviceId: DeviceId)
// kafkaRecs is the data stream from Kafka - type is Dataset[KafkaRecord]
val deviceIdsSeen = kafkaRecs
.withWatermark("timestamp", "5 minutes")
.groupBy(window($"timestamp", "5 minutes", "5 minutes"), $"deviceId")
.count()
.map(row => (row.getLong(0), 1L))
.as[(Long, Long)]
val unseenIds = deviceIdsSeen.join(validDeviceIds, Seq("_1"), "right_outer")
.filter(row => row.isNullAt(1))
.map(row => row.getLong(0))

最后一条语句引发以下异常:

Caused by: org.apache.spark.sql.AnalysisException: Right outer join with a streaming DataFrame/Dataset on the left is not supported;;

提前谢谢。

Spark 结构化流中join operations的情况如下所示:流DataFrames可以与static DataFrames联接,以便进一步创建新的streaming DataFrames。但是,有条件地支持streamingstatic Datasets之间的outer joins,而结构化流式处理通常不支持具有streaming Datasetright/left joins。结果,您遇到了 AnalysisException,当您尝试将静态数据集与流数据集联接时,会抛出该异常。作为我的话的证明,你可以看看 Spark 的源代码,在这一行上异常是抛出,这表示不支持您尝试的操作。

我尝试使用静态DataFramesstream of DataFrames进行连接操作。

val streamingDf = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", "structured_topic")
.load()
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
val staticDf = Seq((1507831462 , 100)).toDF("Timestamp", "DeviceId")
//Inner Join
streamingDf.join(staticDf, "Timestamp")
line.join(staticDf, "Timestamp")
//Left Join
streamingDf.join(staticDf, "Timestamp", "left_join")
line.join(staticDf, "Timestamp", "left_join")

如您所见,除了从我从通过nc(netcat) 启动的套接字读取数据Kafka消耗数据外,它还显着简化了您测试流应用程序时的生活。 这种方法对我来说都很好用,Kafkasocket作为数据源。

希望有帮助。

不支持与另一侧的流式数据集的外部联接:

  • 条件地支持流式处理数据集和静态数据集之间的外部联接。
    • 不支持与流式处理数据集的完全外部联接
    • 不支持左侧外部联接,右侧是流式处理数据集
    • 不支持右侧外部联接与左侧的流式数据集

如果其他Dataset很小,则可以使用Map或类似的结构,broadcast,并在UserDefinedFunction中引用它。

val map: Broadcast[Map[T, U]] = ???
val lookup = udf((x: T) => map.value.get(x))
df.withColumn("foo", lookup($"_1"))

相关内容

  • 没有找到相关文章

最新更新