Flink中自定义聚合函数-没有找到函数签名的匹配项



我想保留每个键的所有原始行;Select ..从. .";查询Flink。我定义了一个AggregateFunction称为RowToJsonAgg,它将行聚合成Json字符串。

class RowToJsonAgg extends AggregateFunction[String, ListBuffer[String]]{
def accumulate(accumulator: ListBuffer[String], row: Any*): Unit = {
....

//假设行看起来像"$field1_name, $field1_value, $field2_name, $field2_value,…";//尝试从row生成json。然而,当我运行查询时,Flink似乎找不到这个函数}

def merge(accumulator: ListBuffer[String], its: java.lang.Iterable[ListBuffer[String]]): Unit = {
accumulator.append(
WrapAsScala.iterableAsScalaIterable(its).flatten.toList:_*
)
}
def resetAccumulator(accumulator: ListBuffer[String]): Unit = {
accumulator.clear()
}
override def getValue(accumulator: ListBuffer[String]): String = {
accumulator.mkString("{", ",", "}")
}
override def createAccumulator(): ListBuffer[String] = ListBuffer.empty
override def getAccumulatorType(): TypeInformation[ListBuffer[String]] = {
TypeInformation.of(classOf[ListBuffer[String]])
}
override def getResultType: TypeInformation[String] = TypeInformation.of(classOf[String])
}

数据类和查询如下:

case class Stock(id:Int, price: Int, volumn: Int, ts: Long)
val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)
bbTableEnv.createTemporarySystemFunction("row_to_json_agg", classOf[RowToJsonAgg])
val table = bbTableEnv.fromValues(...)
bbTableEnv.createTemporaryView("Stock", table)
bbTableEnv.executeSql(
"select price, row_to_json_agg('volumn', volumn, 'ts', ts) as details from Stock group by price"
)

当我运行应用程序时,我得到SQL验证异常,详细消息是">没有找到函数签名row_to_json_agg(CHARACTER, NUMERIC, CHARACTER, NUMERIC),

似乎Flink找不到正确的积累要调用的函数

如果我将accumulate函数声明为如下

def accumulate(accumulator: ListBuffer[String], volumn: Integer, ts: Long)

并将查询更改为

"select price, row_to_json_agg(volumn, ts) from Stock group by price" 

我得到了同样的异常,消息是">没有找到函数签名row_to_json_agg(NUMERIC, NUMERIC),

任何想法如何使聚合函数工作?

我自己弄明白了。

  1. 通过运行如下SQL来注册UDF:

    bbTableEnv.executeSQL (创建临时函数$udf_name作为'%s'", "$full_class_name_of_your_udf"))

bbTableEnv.createTemporarySystemFunction("row_to_json_agg", classOf[RowToJsonAgg])
  1. 更倾向于使用Java实现UDF而不是Scala

相关内容

  • 没有找到相关文章

最新更新