如何使用System.console作为Apache Flink源代码



Apache Flink 提供了许多不同的流媒体源,但我想知道是否可以使用控制台作为数据源。我还没有在网上找到任何例子。

我想出了这个:

DataStream<String> consoleInput = flinkEnv.addSource(new SourceFunction<String>() {
            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                Scanner sc = new Scanner(System.in);
                while (true)
                    ctx.collect(sc.nextLine());
            }
            @Override
            public void cancel() {
            }
        });

我想知道这是否可以或有更好的方法。

使用控制台作为数据源当然是可行的,但存在一个问题,即您的应用程序不具有容错能力,因为 Flink 在发生故障时无法倒带和重播输入流。

但这通常用于原型和实验,通常带有SocketTextStreamFunction,如

env.addSource(new SocketTextStreamFunction("localhost", 9999, "n", -1))

然后,您可以使用 netcat 将控制台连接到端口 9999

nc -lk 9999

或者,正如某些版本的 Netcat 要求的那样,

nc -l -p 9000

最新更新