我修改了Flink的基本WordCount示例,并使用窗口函数播放。
WindowedStream的应用方法被超载,并且接受一个函数:
def apply[R: TypeInformation](
function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = { ... }
以及窗口功能:
def apply[R: TypeInformation](
function: WindowFunction[T, R, K, W]): DataStream[R] = { ... }
在WindowedStream上赋予应用程序的函数时,我会得到编译,但是我的代码不会使用我的窗口函数编译(我不知道为什么..)。
这是基本流:
val windowCounts: WindowedStream[WordWithCount, String, TimeWindow] = text
.flatMap { w => w.split("\s") }
.map { w => WordWithCount(w, 1) }
.keyBy(t => "all")
.window(SlidingProcessingTimeWindows.of(Time.seconds(30), Time.seconds(10)))
这是我对窗口函数的含义。这个对我有用:
def distinctCount(
s: String, tw: TimeWindow, input: Iterable[WordWithCount],
out: Collector[String]): Unit = {
val discount = input.map(t => t.word).toSet.size
out.collect(s"Distinct elements: $discount")
}
// compiles
val distinctCountStream = windowCounts.apply { distinctCount _ }
这个不编译:
class DiscountWindowFunction extends WindowFunction[WordWithCount, String, String, TimeWindow] {
override def apply(key: String, window: TimeWindow, input: lang.Iterable[WordWithCount], out: Collector[String]): Unit = {
val discount = input.map(t => t.word).toSet.size
out.collect(s"Distinct elements: $discount")
}
def apply(key: String, window: TimeWindow, input: Iterable[(String, Int)], out: Collector[String]): Unit = {
apply(key, window, input.asJava, out)
}
}
// does not compile
val distinctCount = windowCounts.apply(new DiscountWindowFunction())
我正在使用Flink 1.3.2,这是我的进口:
import java.lang
import org.apache.flink.streaming.api.functions.windowing.WindowFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
您已导入Java DataStream API中使用的WindowFunction
。
替换时,您的代码应编译
import org.apache.flink.streaming.api.functions.windowing.WindowFunction
import org.apache.flink.streaming.api.scala.function.WindowFunction
顺便说一句。感谢您提供的完整信息: - )