我有一个clojure函数,它使用flambo v0.60 functions api对样本数据集进行一些分析。我注意到,当我使用(get rdd 2)
而不是获得rdd集合中的第二个元素时,它获得了rdd集合中第一个元素的第二个字符。我的假设是clojure将rdd集合的每一行视为一个完整的字符串,而不是一个向量,以便我能够获得集合中的第二个元素。我正在考虑使用映射值函数将映射值转换为我可以获得第二个元素的向量,我尝试了这个:
(defn split-on-tab-transformation [xctx input]
(assoc xctx :rdd (-> (:rdd xctx)
(spark/map (spark/fn [row] (s/split row #"t")))
(spark/map-values vec))))
不幸的是,我得到了一个错误:java.lang.IllegalArgumentException: No matching method found: mapValues for class org.apache.spark.api.java.JavaRDD...
这段代码返回rdd中的第一个集合:(假设我删除了上面函数
中的(spark/map-values vec)
(defn get-distinct-column-val
"input = {:col val}"
[ xctx input ]
(let [rdds (-> (:rdd xctx)
(f/map (f/fn [row] row))
f/first)]
(clojure.pprint/pprint rdds)))
输出:[2.00000 770127 200939.000000 t6094tBENTONVILLE, AR DPSt22.500000t5.000000t2.500000t5.000000t0.000000t0.000000t0.000000t0.000000t0.000000t1tStore Tabt0.000000t4.50t3.83t5.00t0.000000t0.000000t0.000000t0.000000t19.150000]
如果我试图获得第二个元素770127
(defn get-distinct-column-val
"input = {:col val}"
[ xctx input ]
(let [rdds (-> (:rdd xctx)
(f/map (f/fn [row] row))
f/first)]
(clojure.pprint/pprint (get rdds 1)))
我得到:
[.]
map-values的Flambo文档
我是clojure的新手,如果有任何帮助,我将不胜感激。由于首先,map-values
(或Spark API中的mapValues
)仅在PairRDD(例如[:foo [1 2 3]]
)上是有效的转换。具有这样的值的rdd可以被解释为某种类型的映射,其中第一个元素是键,第二个元素是值。
如果你有这样的RDD, mapValues
在不改变键的情况下转换值。在这种情况下,您应该使用第二个映射,尽管它似乎已经过时了,因为clojure.string/split
已经返回一个向量。
使用map-values
的简单示例:
(let [pairs [(ft/tuple :foo 1) (ft/tuple :bar 2)]
rdd (f/parallelize-pairs sc pairs) ;; Note parallelize-pairs -> PairRDD
result (-> rdd
(f/map-values inc) ;; Map values
(f/collect))]
(assert (= result [(ft/tuple :foo 2) (ft/tuple :bar 3)])))
从你的描述看起来你正在使用输入RDD而不是从split-on-tab-transformation
返回的RDD。如果我不得不猜测你试图使用原始的xctx
,而不是从split-on-tab-transformation
返回的那个。由于Clojure maps
是不可变的,assoc
不会改变传递的参数,get-distinct-column-val
接收RDD[String]
而不是RDD[Array[String]]
根据命名约定,我假设您希望为数组中的单个位置获得不同的值。为了清晰起见,我删除了代码中未使用的部分。首先让我们创建虚拟数据:
(spit "data.txt"
(str "Mazda RX4t21t6t160n"
"Mazda RX4 Wagt21t6t160n"
"Datsun 710t22.8t4t108n"))
添加重写版本的函数
(defn split-on-tab-transformation [xctx]
(assoc xctx :rdd (-> (:rdd xctx)
(f/map #(clojure.string/split % #"t")))))
(defn get-distinct-column-val
[xctx col]
(-> (:rdd xctx)
(f/map #(get % col))
(f/distinct)))
结果和
(assert
(= #{"Mazda RX4 Wag" "Datsun 710" "Mazda RX4"}
(-> {:sc sc :rdd (f/text-file sc "data.txt")}
(split-on-tab-transformation)
(get-distinct-column-val 0)
(f/collect)
(set))))