Apache Flink Wikipedia用Scala编辑分析



我正在尝试从https://ci.apache.org/projects/flink/flink/flink/flink/flink-docs-rease-1.2.2./quickstart/run_example_quickstart.html

教程中的代码为

import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
public class WikipediaAnalysis {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
    KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
      .keyBy(new KeySelector<WikipediaEditEvent, String>() {
        @Override
        public String getKey(WikipediaEditEvent event) {
          return event.getUser();
        }
      });
    DataStream<Tuple2<String, Long>> result = keyedEdits
      .timeWindow(Time.seconds(5))
      .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
        @Override
        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
          acc.f0 = event.getUser();
          acc.f1 += event.getByteDiff();
          return acc;
        }
      });
    result.print();
    see.execute();
  }
}

以下是我在Scala中的尝试

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.wikiedits.{WikipediaEditEvent, WikipediaEditsSource}
import org.apache.flink.streaming.api.windowing.time.Time

object WikipediaAnalytics extends App{
  val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  val edits = env.addSource(new WikipediaEditsSource());
  val keyedEdits = edits.keyBy(event => event.getUser)
  val result = keyedEdits.timeWindow(Time.seconds(5)).fold(("", 0L), (we: WikipediaEditEvent, t: (String, Long)) =>
    (we.getUser, t._2 + we.getByteDiff))
}

或多或少是向单词转换为Scala的单词,基于val result的类型应为DataStream[(String, Long)],但是fold()之后推断出的实际类型是没有关闭的。

请帮助确定Scala Code

有什么问题

edit1 :使用fold[R]的咖喱示意图进行了以下更改

  val result_1: (((String, Long), WikipediaEditEvent) => (String, Long)) => DataStream[(String, Long)] =
    keyedEdits.timeWindow(Time.seconds(5)).fold(("", 0L))
  val result_2: DataStream[(String, Long)] = result_1((t: (String, Long), we: WikipediaEditEvent ) =>
    (we.getUser, t._2 + we.getByteDiff))

问题似乎与折叠有关,在累加器初始值之后,您必须具有关闭括号。解决此问题时,代码将无法编译,因为它没有可用于WikipediaediteVent的类型信息。解决的最简单方法是导入更多的Flink Scala API。请参阅下面的完整示例:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource
import org.apache.flink.streaming.api.windowing.time.Time
object WikipediaAnalytics extends App {
  val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  val edits = see.addSource(new WikipediaEditsSource())
  val userEditsVolume: DataStream[(String, Int)] = edits
    .keyBy(_.getUser)
    .timeWindow(Time.seconds(5))
    .fold(("", 0))((acc, event) => (event.getUser, acc._2 + event.getByteDiff))
  userEditsVolume.print()
  see.execute("Wikipedia User Edit Volume")
}

相关内容

  • 没有找到相关文章

最新更新