我正在尝试在Clojure中重写Spark Structured Streaming示例。
该示例在 Scala 中编写如下:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
(ns flambo-example.streaming-example
(:import [org.apache.spark.sql Encoders SparkSession Dataset Row]
[org.apache.spark.sql.functions]
))
(def spark
(->
(SparkSession/builder)
(.appName "sample")
(.master "local[*]")
.getOrCreate)
)
(def lines
(-> spark
.readStream
(.format "socket")
(.option "host" "localhost")
(.option "port" 9999)
.load
)
)
(def words
(-> lines
(.as (Encoders/STRING))
(.flatMap #(clojure.string/split % #" " ))
))
上面的代码会导致以下异常。
;;由 java.lang.IllegalArgumentException 引起;;未找到匹配的方法:类的平面地图;;org.apache.spark.sql.Dataset
如何避免错误?
你必须遵循签名。Java Dataset
API提供了两种Dataset.flatMap
实现,一种需要scala.Function1
def flatMap[U](func: (T) ⇒ TraversableOnce[U])(implicit arg0: Encoder[U]): Dataset[U]
第二个是Spark自己的o.a.s.api.java.function.FlatMapFunction
def flatMap[U](f: FlatMapFunction[T, U], encoder: Encoder[U]): Dataset[U]
前者对您来说毫无用处,但您应该能够使用后者。对于RDD
API flambo
使用宏来创建可以使用flambo.api/fn
访问的 Spark 友好适配器 - 我不确定这些是否直接适用于 Datasets
,但如果需要,您应该能够调整它们。
由于不能依赖隐式Encoders
因此还必须提供与返回类型匹配的显式编码器。
总的来说,你需要一些东西:
(def words
(-> lines
(.as (Encoders/STRING))
(.flatMap f e)
))
其中f
实现FlatMapFunction
,e
是一个Encoder
。一个示例实现:
(def words
(-> lines
(.as (Encoders/STRING))
(.flatMap
(proxy [FlatMapFunction] []
(call [s] (.iterator (clojure.string/split s #" "))))
(Encoders/STRING))))
但我想有可能找到更好的。
在实践中,我会避免打字Dataset
,而专注于DataFrame
(Dataset[Row]
)。