我正在使用Spark structured streaming
来处理从Kafka
读取的记录。这是我想要实现的目标:
(a) 每条记录都是(Timestamp, DeviceId)
类Tuple2
。
(b) 我创建了一个静态Dataset[DeviceId]
其中包含预期在Kafka
流中看到的所有有效设备 ID(类型DeviceId
)的集合。
(c) 我需要编写一个Spark structured streaming
查询
(i) Groups records by their timestamp into 5-minute windows
(ii) For each window, get the list of valid device IDs that were **not** seen in that window
例如,假设所有有效设备 ID 的列表[A,B,C,D,E]
,并且某个 5 分钟窗口中的 kafka 记录包含设备 ID[A,B,E]
。然后,对于该窗口,我正在寻找的看不见的设备 ID 列表是[C,D]
.
问题
- 如何在 Spark 结构化流式处理中编写此查询?我尝试使用
Dataset
暴露的except()
和join()
方法。但是,他们都抛出了一个运行时异常,抱怨streaming Dataset
不支持这些操作。
这是我的代码片段:
val validDeviceIds: Dataset[(DeviceId, Long)] = spark.createDataset[DeviceId](listOfAllDeviceIds.map(id => (id, 0L)))
case class KafkaRecord(timestamp: TimestampType, deviceId: DeviceId)
// kafkaRecs is the data stream from Kafka - type is Dataset[KafkaRecord]
val deviceIdsSeen = kafkaRecs
.withWatermark("timestamp", "5 minutes")
.groupBy(window($"timestamp", "5 minutes", "5 minutes"), $"deviceId")
.count()
.map(row => (row.getLong(0), 1L))
.as[(Long, Long)]
val unseenIds = deviceIdsSeen.join(validDeviceIds, Seq("_1"), "right_outer")
.filter(row => row.isNullAt(1))
.map(row => row.getLong(0))
最后一条语句引发以下异常:
Caused by: org.apache.spark.sql.AnalysisException: Right outer join with a streaming DataFrame/Dataset on the left is not supported;;
提前谢谢。
Spark 结构化流中join operations
的情况如下所示:流DataFrames
可以与static DataFrames
联接,以便进一步创建新的streaming DataFrames
。但是,有条件地支持streaming
和static Datasets
之间的outer joins
,而结构化流式处理通常不支持具有streaming Dataset
的right/left joins
。结果,您遇到了 AnalysisException,当您尝试将静态数据集与流数据集联接时,会抛出该异常。作为我的话的证明,你可以看看 Spark 的源代码,在这一行上异常是抛出,这表示不支持您尝试的操作。
我尝试使用静态DataFrames
对stream of DataFrames
进行连接操作。
val streamingDf = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", "structured_topic")
.load()
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
val staticDf = Seq((1507831462 , 100)).toDF("Timestamp", "DeviceId")
//Inner Join
streamingDf.join(staticDf, "Timestamp")
line.join(staticDf, "Timestamp")
//Left Join
streamingDf.join(staticDf, "Timestamp", "left_join")
line.join(staticDf, "Timestamp", "left_join")
如您所见,除了从我从通过nc
(netcat) 启动的套接字读取数据Kafka
消耗数据外,它还显着简化了您测试流应用程序时的生活。 这种方法对我来说都很好用,Kafka
和socket
作为数据源。
希望有帮助。
不支持与另一侧的流式数据集的外部联接:
有
- 条件地支持流式处理数据集和静态数据集之间的外部联接。
- 不支持与流式处理数据集的完全外部联接
- 不支持左侧外部联接,右侧是流式处理数据集
- 不支持右侧外部联接与左侧的流式数据集
如果其他Dataset
很小,则可以使用Map
或类似的结构,broadcast
,并在UserDefinedFunction
中引用它。
val map: Broadcast[Map[T, U]] = ???
val lookup = udf((x: T) => map.value.get(x))
df.withColumn("foo", lookup($"_1"))