我使用flink-connectors(Flink-Hbase_2.11)的HBase TableInputFormat使用Flink 1.3.2,使用数据集API。
我有一个hbase表,在该表中,划分如下:
| RowKey | data |
| 0-someuniqid | data |
| 0-someuniqid | data |
| 2-someuniqid | data |
| 2-someuniqid | data |
| 4-someuniqid | data |
| 5-someuniqid | data |
| 5-someuniqid | data |
| 7-someuniqid | data |
| 8-someuniqid | data |
表的前缀可以为0到9(这是为了防止HBase节点中的热点斑点)。在我的测试表中,没有人写这张桌子。
我有表格的工作:
tableInputFormat0 = new TableInputFormat("table", 0);
tableInputFormat1 = new TableInputFormat("table", 1);
...
tableInputFormat9 = new TableInputFormat("table", 9);
tableInputFormat0.union(tableInputFormat1).(...).union(tableInputFormat9)
.map(mapFunction())
.rebalance()
.filter(someFilter())
.groupBy(someField())
.reduce(someSumFunction())
.output(new HbaseOutputFormat());
问题是当读取大量记录时(大约有2000万记录),该作业并不总是读取相同数量的记录。
大多数时间(正确)读取:20,277,161行。但是有时它会读取:20,277,221或20,277,171总是更少。(我通过Flink Web仪表板获得了这个数字,但是我在编写的内容中可以看到的效果,即汇编太多数据)
)我无法通过使用较小的数据集将问题缩小,因为当作业在500万个记录的表上运行作业时不会发生问题。由于卷的卷,很难确定多次读取哪些记录。
如何调试(和解决)此问题?
TableInputFormat
是一个抽象类,您必须实现一个子类。
我会做两件事:
- 检查每个输入拆分是否仅处理一次(此信息写入JobManager日志文件)
- 调整您的输入格式以计算每个输入拆分的发射记录数量。记录计数和拆分ID应写入(TaskManager)日志。
这应该有助于确定问题是否为
- 到目
- 由于代码中的一个错误处理了分裂。