我正在尝试使用Flink Table API执行聚合,方法是接受用户提供的逐字段和字段聚合表达式作为字符串参数。
输入
- GroupBy字段=部门
- 聚合字段表达式=
count(employeeId)
,max(salary)
有没有什么方法可以使用flink Table API来实现?我试着做了以下几件事,但无济于事。弗林克在《星火》中有类似selectExpr
的功能吗?https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.DataFrame.selectExpr.html
employeeTable
.groupBy($("department"))
.select(
$("department"),
$("count(employeeId)").as("numberOfEmployees"),
$("max(salary)").as("maxSalary")
)
它正在抛出以下异常
Exception in thread "main" org.apache.flink.table.api.ValidationException: Cannot resolve field [count(employeeId)], input field list:[department].
不,我认为这行不通。Flink的SQL计划器想知道查询在编译时在做什么。
您所能做的就是构造一个SQL查询并创建一个新作业来运行该查询。Flink 1.16中即将推出的SQL网关(请参阅FLIP-91(应该会让这变得更容易。
我认为您有错误的语法。
.select(
$("department"),
$("count(employeeId)").as("numberOfEmployees"),
$("max(salary)").as("maxSalary")
)
count和max您应该这样调用:
$("employeeId").count().as("numberOfEmployees"),
$("salary").max().as("maxSalary")
你可以在这里查看内置功能