在我的 Flink 代码中,我正在流式传输一个位于 HDFS 文件夹上的文件,我收到错误"(没有这样的文件或目录)",但是我确定文件名和地址是正确的,因为我在批处理方法中使用相同的方法,并且每件事都顺利进行。有谁知道可能是什么问题?这是我的代码:
DataStream<FebrlObject> myStream =
env.addSource(new MyObjectGenerator("hdfs://../Data/Dataset1.csv"));
及其相关类:
public class MyObjectGenerator implements SourceFunction<MyObject> {
private String dataFilePath;
private float servingSpeedFactor;
private Integer rowNo ;
private transient BufferedReader reader;
private transient InputStream inputStream;
public MyObjectGenerator(String dataFilePath) {
this(dataFilePath, 1.0f);
}
public MyObjectGenerator(String dataFilePath, float servingSpeedFactor) {
this.dataFilePath = dataFilePath;
this.servingSpeedFactor = servingSpeedFactor;
rowNo = 0 ;
}
@Override
public void run(SourceContext<MyObject> sourceContext) throws Exception {
long servingStartTime = Calendar.getInstance().getTimeInMillis();
inputStream = new DataInputStream(new FileInputStream(dataFilePath));
reader = new BufferedReader(new InputStreamReader(inputStream));
String line;
long dataStartTime;
rowNo++;
if (reader.ready() && (line = reader.readLine()) != null ) {
MyObject myObject = MyObject.fromString(line);
if (febrlObject!= null )
sourceContext.collect(myObject);
} else {
return;
}
while (reader.ready() && (line = reader.readLine()) != null) {
MyObject myObject = MyObject.fromString(line);
sourceContext.collect( febrlObject );
}
this.reader.close();
this.reader = null;
this.inputStream.close();
this.inputStream = null;
}
@Override
public void cancel() {
try {
if (this.reader != null) {
this.reader.close();
}
if( this.inputStream != null) {
this.inputStream.close();
}
} catch (IOException ioe) {
//
} finally {
this.reader = null;
this.inputStream = null;
}
}
}
您尝试使用 Java 的常规FileInputStream
访问 HDFS 中的文件。 FileInputStream can only access the local file system. It does not know anything about talking to HDFS. You need to use the HDFS client to read files from HDFS. See Flink's
FileInputFormat"为例。
但是,如果可能的话,我会尽量避免自己实现这一点。你可以尝试使用 Flink 的FileInputFormat
来读取文件行(返回一个DataStream<String>
)和一个解析该行的连续(平面)映射器。