我遇到了一个问题,在加入之前,我需要转换正在从spark读取的两个流。
一旦我做了转换,我就不能再加入了,我想类型不再是DStream[(String,String)],而是DStream[Map[String,String]]
val windowStream1 = act1Stream.window(Seconds(5)).transform{rdd => rdd.map(_._2).map(l =>(...toMap)}
val windowStream2 = act2Stream.window(Seconds(5)).transform{rdd => rdd.map(_._2).map(l =>(...toMap)}
val joinedWindow = windowStream1.join(windowStream2) //can't join
知道吗?
这并不能解决您的问题,但会使问题更容易理解。您可以通过定义具有期望类型的时态val/def/var标识符来拆分方法链并记录您在每个步骤中期望的类型。通过这种方式,你可以很容易地发现类型与你的期望不匹配的地方。
例如,我希望您的act1Stream
和act2Stream
实例的类型为DStream[(String, String)]
,我暂时将其称为s1
和s2
。如果不是这样,请评论我。
def joinedWindow(
s1: DStream[(String, String)],
s2: DStream[(String, String)]
): DStream[...] = {
val w1 = windowedStream(s1)
val w2 = windowedStream(s2)
w1.join(w2)
}
def windowedStream(actStream: DStream[(String, String)]): DStream[Map[...]] = {
val windowed: DStream[(String, String)] = actStream.window(Seconds(5))
windowed.transform( myTransform )
}
def myTransform(rdd: RDD[(String, String)]): RDD[Map[...]] = {
val mapped: RDD[String] = rdd.map(_._2)
// not enough information to conclude
// the result type from given code
mapped.map(l =>(...toMap))
}
从那里可以通过填充...
部分来总结其余类型。逐行消除编译器错误,直到得到所需的结果。有的文件
DStream[T]
def window(windowDuration: Duration): DStream[T]
def transform[U](transformFunc: (RDD[T]) ⇒ RDD[U])(implicit arg0: ClassTag[U]): DStream[U]
PairDStreamFunctions[K,V]
def join[W](other: DStream[(K, W)])(implicit arg0: ClassTag[W]): DStream[(K, (V, W))]
RDD[T]
def map[U](f: (T) ⇒ U)(implicit arg0: ClassTag[U]): RDD[U]
至少通过这种方式,您可以确切地知道期望的类型和生成的类型不匹配。