我想使用Apache Flink来完成以下任务。我有一个主流,必须通过另一个流的数据来丰富它。该主流具有属性为"site"one_answers"timestamp"的元素。另一个流(我们称之为countrystream)具有属性"site"one_answers"country"。countrystream应跟踪网站使用的最新国家/地区。例如,如果("klm.com", "netherlands")
先到达,一段时间后元组("klm.com", "france")
到达,那么"klm.com"应该指向"france"(因为这是后一个)。因此,它应该保持一种状态。假设一个元组("klm.com",100)到达主流。现在应该将其富集为("klm.com", 100, "france")
。如果在乡村溪流中找不到一些遗址,就应该用"?"来丰富它。例如,("stackoverflow.com", 150, "?")
。我该如何存档?
我找到了一个解决方案(花了一些时间)。这有效吗?它可以改进吗?这是否意味着我不能为迭代流设置检查点?
val env = StreamExecutionEnvironment.getExecutionEnvironment
val mainStream = env.fromElements("a", "a", "b", "a", "a", "b", "b", "a", "c", "b", "a", "c")
val infoStream = env.fromElements((1, "a", "It is F"), (2, "b", "It is B"), (3, "c", "It is C"), (4, "a", "Whoops, it is A"))
.iterate(
iteration => {
(iteration, iteration)
}
)
mainStream
.coGroup(infoStream)
.where[String]((x: String) => x)
.equalTo(_._2)
.window(TumblingProcessingTimeWindows.of(Time.seconds(1))) {
(first: Iterator[String], second: Iterator[(Int, String, String)], out: Collector[(String, String)]) => {
first.foreach((key: String) => {
val matchingRecords = second
.filter(_._2 == key)
if (matchingRecords.nonEmpty) {
val matchingRecord = matchingRecords.maxBy(_._1)
out.collect((matchingRecord._2, matchingRecord._3))
}
}
)
}
}
.print()
env.execute("proof_of_concept")