用另一股流来丰富一股流



我想使用Apache Flink来完成以下任务。我有一个主流,必须通过另一个流的数据来丰富它。该主流具有属性为"site"one_answers"timestamp"的元素。另一个流(我们称之为countrystream)具有属性"site"one_answers"country"。countrystream应跟踪网站使用的最新国家/地区。例如,如果("klm.com", "netherlands")先到达,一段时间后元组("klm.com", "france")到达,那么"klm.com"应该指向"france"(因为这是后一个)。因此,它应该保持一种状态。假设一个元组("klm.com",100)到达主流。现在应该将其富集为("klm.com", 100, "france")。如果在乡村溪流中找不到一些遗址,就应该用"?"来丰富它。例如,("stackoverflow.com", 150, "?")。我该如何存档?

我找到了一个解决方案(花了一些时间)。这有效吗?它可以改进吗?这是否意味着我不能为迭代流设置检查点?

val env = StreamExecutionEnvironment.getExecutionEnvironment
val mainStream = env.fromElements("a", "a", "b", "a", "a", "b", "b", "a", "c", "b", "a", "c")
val infoStream = env.fromElements((1, "a", "It is F"), (2, "b", "It is B"), (3, "c", "It is C"), (4, "a", "Whoops, it is A"))
        .iterate(
            iteration => {
                (iteration, iteration)
            }
        )
mainStream
    .coGroup(infoStream)
        .where[String]((x: String) => x)
        .equalTo(_._2)
        .window(TumblingProcessingTimeWindows.of(Time.seconds(1))) {
            (first: Iterator[String], second: Iterator[(Int, String, String)], out: Collector[(String, String)]) => {
                first.foreach((key: String) => {
                        val matchingRecords = second
                            .filter(_._2 == key)
                        if (matchingRecords.nonEmpty) {
                            val matchingRecord = matchingRecords.maxBy(_._1)
                            out.collect((matchingRecord._2, matchingRecord._3))
                        }
                    }
                )
            }
        }
    .print()
env.execute("proof_of_concept")

相关内容

  • 没有找到相关文章

最新更新