带有HBASE表输入格式的Flink DataSet API - 多次读取行



我使用flink-connectors(Flink-Hbase_2.11)的HBase TableInputFormat使用Flink 1.3.2,使用数据集API。

我有一个hbase表,在该表中,划分如下:

| RowKey       | data |
| 0-someuniqid | data |
| 0-someuniqid | data |
| 2-someuniqid | data |
| 2-someuniqid | data |
| 4-someuniqid | data |
| 5-someuniqid | data |
| 5-someuniqid | data |
| 7-someuniqid | data |
| 8-someuniqid | data |

表的前缀可以为0到9(这是为了防止HBase节点中的热点斑点)。在我的测试表中,没有人写这张桌子。

我有表格的工作:

tableInputFormat0 = new TableInputFormat("table", 0);
tableInputFormat1 = new TableInputFormat("table", 1);
...
tableInputFormat9 = new TableInputFormat("table", 9);

tableInputFormat0.union(tableInputFormat1).(...).union(tableInputFormat9)
                 .map(mapFunction())
                 .rebalance()
                 .filter(someFilter())
                 .groupBy(someField())
                 .reduce(someSumFunction())
                 .output(new HbaseOutputFormat());

问题是当读取大量记录时(大约有2000万记录),该作业并不总是读取相同数量的记录。

大多数时间(正确)读取:20,277,161行。但是有时它会读取:20,277,221或20,277,171总是更少。(我通过Flink Web仪表板获得了这个数字,但是我在编写的内容中可以看到的效果,即汇编太多数据)

我无法通过使用较小的数据集将问题缩小,因为当作业在500万个记录的表上运行作业时不会发生问题。由于卷的卷,很难确定多次读取哪些记录。

如何调试(和解决)此问题?

TableInputFormat是一个抽象类,您必须实现一个子类。

我会做两件事:

  • 检查每个输入拆分是否仅处理一次(此信息写入JobManager日志文件)
  • 调整您的输入格式以计算每个输入拆分的发射记录数量。记录计数和拆分ID应写入(TaskManager)日志。

这应该有助于确定问题是否为

  • 到目
  • 由于代码中的一个错误处理了分裂。

相关内容

  • 没有找到相关文章

最新更新