Apache Flink-启用联接排序



我注意到Apache Flink并没有优化表的连接顺序。目前,它保持用户指定的联接顺序(基本上,它从字面上接受查询(。我认为ApacheCalcite可以优化连接的顺序,但由于某些原因,这些规则在ApacheFlink中没有使用。

例如,如果我们有两个表">R"one_answers">S

private val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
private val fileNumber = 1
tableEnv.registerTableSource("R", getDataSourceR(fileNumber))
tableEnv.registerTableSource("S", getDataSourceS(fileNumber))
private val r = tableEnv.scan("R")
private val s = tableEnv.scan("S")

我们假设">S"是空的,我们想用两种方式连接这些表:

val tableOne = r.as("x1, x2").join(r.as("x3, x4")).where("x2 === x3").select("x1, x4")
.join(s.as("x5, x6")).where("x4 === x5 ").select("x1, x6")

val tableTwo = s.as("x1, x2").join(r.as("x3, x4")).where("x2 === x3").select("x1, x4")
.join(r.as("x5, x6")).where("x4 === x5 ").select("x1, x6")

如果我们想计算tableOnetableTwo中的行数,在这两种情况下的结果都将为零。问题是评估tableOne将比评估tableTwo花费更长的时间。

有没有什么方法可以让我们自动优化连接的执行顺序,甚至通过添加一些统计数据来实现可能的计划成本操作?如何添加这些统计数据?

在这个链接的文档中,可能有必要更改表环境CalciteConfig,但我不清楚如何做到这一点。

请帮忙。

由于Flink不能很好地处理统计信息,所以未启用Join重新排序。在没有精确基数估计的情况下重新排序联接基本上是在赌博。因此,将禁用联接重新排序,并按照用户提供的顺序联接表。这提供了一种确定性和可控性的行为。

但是,在创建TableEnvironment(即TableEnvironment.getTableEnvironment(env,yourTableConfig((时,可以通过将TableConfigCalciteConfig传递给优化器来将优化规则传递给优化器。在CalciteConfig中,您可以将优化规则添加到不同的优化阶段。您可能希望将JoinCommuteRuleJoinAssociateRule添加到逻辑优化阶段。您可能还需要深入研究代码,以检查如何将统计信息传递给优化器。

相关内容

  • 没有找到相关文章

最新更新