我加入了两个数据集——第一个来自流,第二个在HDFS中。
我在spark中使用scala。在加入两个数据集之后,我需要对加入的数据集应用过滤器,但在这里我面临着一个问题。请协助解决。
我正在使用下面的代码,
val streamkv = streamrecs.map(_.split("~")).map(r => ( r(0), (r(5), r(6))))
val HDFSlines = sc.textFile("/user/Rest/sample.dat").map(_.split("~")).map(r => ( r(1), (r(0) r(3),r(4),)))
val streamwindow = streamkv.window(Minutes(1))
val join1 = streamwindow.transform(joinRDD => { joinRDD.join(HDFSlines)} )
我得到以下错误,当我使用过滤器
val tofilter = join1.filter {
| case (r(0), (r(5), r(6)),(r(0),r(3),r(4))) =>
| r(4).contains("iPhone")
| }.count()
<console>:48: error: constructor cannot be instantiated to expected type;
found : (T1, T2, T3)
required: (String, ((String, String), (String, String, String)))
case (r(0), (r(5), r(6)),(r(0),r(3),r(4))) =>
在scala(这是一种类型化语言FYI)中,模式匹配(case)允许在块中"提取"变量以供本地使用。因此
case (r(0), (r(5), r(6)),(r(0),r(3),r(4))) =>
是非法的,因为这里不是调用r函数,而是提取值(想想函数声明)。
假设生成的流/集合join1中的对象遵循此类型签名(_, (_, _), (_, _, String))
,则应该尝试:
val tofilter = join1.filter {
| case (_, (_, _),(_,_,device)) =>
| device.contains("iPhone")
| }.count()
错误的原因是缺少一对圆括号。
带有case (r(0), (r(5), r(6)),(r(0),r(3),r(4)))
的行应该是case (_, ((_, _),(_,_,device))) =>
——注意最后两个元素周围的括号——Tuple2
和Tuple3
对象。