Apache Flink(集群中的标准输出错误)



Apache Flink中,我无法看到std out的输出,但我的工作正在运行并且数据即将到来

当您在集群上运行作业时,数据流将打印到 TaskManager 进程的标准输出中。此 TaskManager stdout 被定向到 Flink 根目录的 ./log/目录中的 .out 文件。我相信在这里你已经看到了你的输出。

我不知道是否可以更改任务管理器的标准输出,但是,一个快速而肮脏的解决方案可能是将输出写入套接字:

output.writeToSocket(outputHost, outputPort, new SimpleStringSchema())
public static void main(String[] args) throws Exception {
    // the host and the port to connect to
    final String hostname = "192.168.1.73";
    final int port = 9000;
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("192.168.1.68", 6123);
    // get input data by connecting to the socket
    DataStream<String> text = env.socketTextStream(hostname, port, "n");
    // parse the data, group it, window it, and aggregate the counts
    DataStream<WordWithCount> windowCounts = text
            .flatMap(new FlatMapFunction<String, WordWithCount>() {
                public void flatMap(String value, Collector<WordWithCount> out) {
                    for (String word : value.split("\s")) {
                        out.collect(new WordWithCount(word, 1L));
                    }
                }
            })
            .keyBy("word").timeWindow(Time.seconds(5))
            .reduce(new ReduceFunction<WordWithCount>() {
                public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                    return new WordWithCount(a.word, a.count + b.count);
                }
            });
    // print the results with a single thread, rather than in parallel
    windowCounts.print().setParallelism(1);
    env.execute("Socket Window WordCount");
}
public static class WordWithCount {
    public String word;
    public long count;
    public WordWithCount() {
    }
    public WordWithCount(String word, long count) {
        this.word = word;
        this.count = count;
    }
    @Override
    public String toString() {
        return word + " : " + count;
    }
}

相关内容

  • 没有找到相关文章

最新更新