使用fromElements函数创建数据流时出错
以下是价格-
引起原因:java.io.IOException:未能从反序列化元素来源。如果使用用户定义的序列化(值和可写类型),检查序列化函数。序列化程序是org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@599fcdda网址:org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunctions.java:121)网址:org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)网址:org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)网址:org.apache.flink.streaming.runtime.tasks.StreamTask.ioke(StreamTask.java:218)网址:org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)在java.lang.Thread.run(Thread.java:745)
为什么要处理InputStreamReader
元组?我想这里有些理解失误。泛型类型指定要处理的数据的类型。例如
DataStream<Integer> intStream = env.fromElements(1, 2, 3, 4, 5);
生成具有5个Integer
元组的有限数据流。我假设您实际上想要使用InputStreamReader
来生成实际的元组。
如果你想通过HttpURLConnection
读取,你可以实现你自己的SourceFunction
(或RichSourceFunction
),如下所示(用你想要使用的实际数据类型替换OUT
——也可以考虑Flink Tuple0
到Tuple25
类型):
env.addSource(new SourceFunction<OUT> {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<OUT> ctx) {
InputStreamReader isr = null;
try {
URL url = new URL("ex.in/res");
HttpURLConnection httpconn = (HttpURLConnection) url.openConnection();
if (httpconn.getResponseCode() != 200)
throw new RuntimeException("Failed : HTTP error code : " + httpconn.getResponseCode());
isr = new InputStreamReader((httpconn.getInputStream()));
} catch (Exception e) {
// clean up; log error
return;
}
while(isRunning) {
OUT tuple = ... // get data from isr
ctx.collect(tuple);
}
}
@Override
public void cancel() {
this.isRunning = false;
}
});
由于InputStreamReader
不可序列化,因此无法使用fromElements
创建DataStream<InputStreamReader>
。这是fromElements
方法所要求的。此外,研究InputStreamReaders
可能没有那么大意义。我想最好简单地从HttpURLConnection
中读取数据,然后继续处理这些数据。