在Apache Flink中创建数据流时出错



使用fromElements函数创建数据流时出错

以下是价格-

引起原因:java.io.IOException:未能从反序列化元素来源。如果使用用户定义的序列化(值和可写类型),检查序列化函数。序列化程序是org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@599fcdda网址:org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunctions.java:121)网址:org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)网址:org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)网址:org.apache.flink.streaming.runtime.tasks.StreamTask.ioke(StreamTask.java:218)网址:org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)在java.lang.Thread.run(Thread.java:745)

为什么要处理InputStreamReader元组?我想这里有些理解失误。泛型类型指定要处理的数据的类型。例如

DataStream<Integer> intStream = env.fromElements(1, 2, 3, 4, 5);

生成具有5个Integer元组的有限数据流。我假设您实际上想要使用InputStreamReader来生成实际的元组。

如果你想通过HttpURLConnection读取,你可以实现你自己的SourceFunction(或RichSourceFunction),如下所示(用你想要使用的实际数据类型替换OUT——也可以考虑Flink Tuple0Tuple25类型):

env.addSource(new SourceFunction<OUT> {
    private volatile boolean isRunning = true;
    @Override
    public void run(SourceContext<OUT> ctx) {
        InputStreamReader isr = null;
        try {
            URL url = new URL("ex.in/res");
            HttpURLConnection httpconn = (HttpURLConnection) url.openConnection();
            if (httpconn.getResponseCode() != 200)
                throw new RuntimeException("Failed : HTTP error code : " + httpconn.getResponseCode());
            isr = new InputStreamReader((httpconn.getInputStream()));
        } catch (Exception e) {
            // clean up; log error
            return;
        }
        while(isRunning) {
            OUT tuple = ... // get data from isr
            ctx.collect(tuple);
        }
    }
    @Override
    public void cancel() {
         this.isRunning = false;
    }
});

由于InputStreamReader不可序列化,因此无法使用fromElements创建DataStream<InputStreamReader>。这是fromElements方法所要求的。此外,研究InputStreamReaders可能没有那么大意义。我想最好简单地从HttpURLConnection中读取数据,然后继续处理这些数据。

相关内容

  • 没有找到相关文章

最新更新