我有一个s3,其中包含tb级的数据,分成小于5mb的小文件。我试着用flink来处理它们。我用下一个代码创建源代码。
var inputFormat = new TextInputFormat(null);
inputFormat.setNestedFileEnumeration(true);
return streamExecutionEnvironment.readFile(inputFormat, "s3://name/");
但是使用的内存增长到限制,作业被杀死,并且没有再次调度,出现错误:
Could not fulfill resource requirements of job
sink中没有数据
对于小数据集,它工作得很好。
如何在不使用太多内存的情况下读取文件?谢谢。
与:
行为相同env.fromSource( FileSource.forRecordStreamFormat(
new TextLineFormat(),
new Path("s3://name/")
)
.monitorContinuously(Duration.ofMillis(10000L))
.build(),
WatermarkStrategy.noWatermarks(),
"MySourceName"
)
FileSource
是从文件中摄取数据的首选方式。它应该能够处理你所说的那种规模。
文档
javadocs
setQueueLimit
on kineesis producer解决了我的问题https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/#backpressure