Apache Flink 提供了许多不同的流媒体源,但我想知道是否可以使用控制台作为数据源。我还没有在网上找到任何例子。
我想出了这个:
DataStream<String> consoleInput = flinkEnv.addSource(new SourceFunction<String>() {
@Override
public void run(SourceContext<String> ctx) throws Exception {
Scanner sc = new Scanner(System.in);
while (true)
ctx.collect(sc.nextLine());
}
@Override
public void cancel() {
}
});
我想知道这是否可以或有更好的方法。
使用控制台作为数据源当然是可行的,但存在一个问题,即您的应用程序不具有容错能力,因为 Flink 在发生故障时无法倒带和重播输入流。
但这通常用于原型和实验,通常带有SocketTextStreamFunction
,如
env.addSource(new SocketTextStreamFunction("localhost", 9999, "n", -1))
然后,您可以使用 netcat 将控制台连接到端口 9999
nc -lk 9999
或者,正如某些版本的 Netcat 要求的那样,
nc -l -p 9000