在 Clojure 中编写 Spark 结构化流示例时出错



我正在尝试在Clojure中重写Spark Structured Streaming示例。

该示例在 Scala 中编写如下:

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

(ns flambo-example.streaming-example
  (:import [org.apache.spark.sql Encoders SparkSession Dataset Row]
           [org.apache.spark.sql.functions]
           ))
(def spark
  (->
   (SparkSession/builder)
   (.appName "sample")
   (.master "local[*]")
   .getOrCreate)  
  )

(def lines
  (-> spark
      .readStream
      (.format "socket")
      (.option "host" "localhost")
      (.option "port" 9999)
      .load      
      )
  )
(def words
  (-> lines
      (.as (Encoders/STRING))      
      (.flatMap #(clojure.string/split  % #" " ))      
      ))

上面的代码会导致以下异常。

;;由 java.lang.IllegalArgumentException 引起;;未找到匹配的方法:类的平面地图;;org.apache.spark.sql.Dataset

如何避免错误?

你必须遵循签名。Java Dataset API提供了两种Dataset.flatMap实现,一种需要scala.Function1

def flatMap[U](func: (T) ⇒ TraversableOnce[U])(implicit arg0: Encoder[U]): Dataset[U] 

第二个是Spark自己的o.a.s.api.java.function.FlatMapFunction

def flatMap[U](f: FlatMapFunction[T, U], encoder: Encoder[U]): Dataset[U] 

前者对您来说毫无用处,但您应该能够使用后者。对于RDD API flambo使用宏来创建可以使用flambo.api/fn访问的 Spark 友好适配器 - 我不确定这些是否直接适用于 Datasets ,但如果需要,您应该能够调整它们。

由于不能依赖隐式Encoders因此还必须提供与返回类型匹配的显式编码器。

总的来说,你需要一些东西:

(def words
  (-> lines
    (.as (Encoders/STRING))      
    (.flatMap f e)      
  ))

其中f实现FlatMapFunctione是一个Encoder。一个示例实现:

(def words
  (-> lines
      (.as (Encoders/STRING))      
      (.flatMap
        (proxy [FlatMapFunction] [] 
          (call [s] (.iterator (clojure.string/split s #" ")))) 
        (Encoders/STRING))))

但我想有可能找到更好的。

在实践中,我会避免打字Dataset,而专注于DataFrameDataset[Row])。

相关内容

  • 没有找到相关文章

最新更新