我正在通过flink读取csv文件.csv文件有特定数量的列。
我已经定义了
RowCsvInputFormat format = new RowCsvInputFormat(filePath,
new TypeInformation[]{
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
});
如果在文件中所有行都有适当的 4 列,则代码工作正常。
我想处理文件中几行没有 4 列或几行中存在任何其他问题的情况。
我如何在眨眼中实现这一点。
如果您查看维基百科或 rfc4180 上的规范,似乎 CSV 文件应该只包含具有相同列数的行。因此,RowCsvInputFormat不支持此功能是有道理的。
您可以使用 readTextFile(path( 读取文件,然后在flatMap()
运算符中将字符串解析为 Row 对象(如果行中存在问题,则忽略(
env.readTextFile(params.get("input"))
.flatMap(someCsvRowParseFunction())