Spark流-转换两个流并加入



我遇到了一个问题,在加入之前,我需要转换正在从spark读取的两个流。

一旦我做了转换,我就不能再加入了,我想类型不再是DStream[(String,String)],而是DStream[Map[String,String]]

val windowStream1 = act1Stream.window(Seconds(5)).transform{rdd => rdd.map(_._2).map(l =>(...toMap)}
val windowStream2 = act2Stream.window(Seconds(5)).transform{rdd => rdd.map(_._2).map(l =>(...toMap)}
val joinedWindow = windowStream1.join(windowStream2)  //can't join

知道吗?

这并不能解决您的问题,但会使问题更容易理解。您可以通过定义具有期望类型的时态val/def/var标识符来拆分方法链并记录您在每个步骤中期望的类型。通过这种方式,你可以很容易地发现类型与你的期望不匹配的地方。

例如,我希望您的act1Streamact2Stream实例的类型为DStream[(String, String)],我暂时将其称为s1s2。如果不是这样,请评论我。

def joinedWindow(
      s1: DStream[(String, String)], 
      s2: DStream[(String, String)]
    ): DStream[...] = {
  val w1 = windowedStream(s1)
  val w2 = windowedStream(s2)
  w1.join(w2)
}
def windowedStream(actStream: DStream[(String, String)]): DStream[Map[...]] = {
  val windowed: DStream[(String, String)] = actStream.window(Seconds(5))
  windowed.transform( myTransform )
}
def myTransform(rdd: RDD[(String, String)]): RDD[Map[...]] = {
  val mapped: RDD[String] = rdd.map(_._2)
  // not enough information to conclude 
  // the result type from given code
  mapped.map(l =>(...toMap)) 
}

从那里可以通过填充...部分来总结其余类型。逐行消除编译器错误,直到得到所需的结果。有的文件

DStream[T]

  • def window(windowDuration: Duration): DStream[T]
  • def transform[U](transformFunc: (RDD[T]) ⇒ RDD[U])(implicit arg0: ClassTag[U]): DStream[U]

PairDStreamFunctions[K,V]

  • def join[W](other: DStream[(K, W)])(implicit arg0: ClassTag[W]): DStream[(K, (V, W))]

RDD[T]

  • def map[U](f: (T) ⇒ U)(implicit arg0: ClassTag[U]): RDD[U]

至少通过这种方式,您可以确切地知道期望的类型和生成的类型不匹配。

最新更新