我正在尝试遵循文档并创建一个表函数来"扁平化"一些数据。使用joinLateral
进行展平时,表函数似乎工作正常。但是,使用leftOuterJoinLateral
时,我收到以下错误。我正在使用 Scala,并尝试了表 API 和 SQL,结果相同:
原因:java.lang.NullPointerException:Null 结果不能存储在 Case 类中。
这是我的工作:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.functions.TableFunction
object example_job{
// Split the List[Int] into multiple rows
class Split() extends TableFunction[Int] {
def eval(nums: List[Int]): Unit = {
nums.foreach(x =>
if(x != 3) {
collect(x)
})
}
}
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironment()
val tableEnv = StreamTableEnvironment.create(env)
val splitMe = new Split()
// Create some dummy data
val events: DataStream[(String, List[Int])] = env.fromElements(("simon", List(1,2,3)), ("jessica", List(3)))
val table = tableEnv.fromDataStream(events, 'name, 'numbers)
.leftOuterJoinLateral(splitMe('numbers) as 'number)
.select('name, 'number)
table.toAppendStream[(String, Int)].print()
env.execute("Flink jira ticket example")
}
}
当我将.leftOuterJoinLateral
更改为.joinLateral
时,我得到了预期的结果:
(simon,1)
(simon,2)
使用.leftOuterJoinLateral
时,我希望像这样:
(simon,1)
(simon,2)
(simon,null) // or (simon, None)
(jessica,null) // or (jessica, None)
似乎这可能是 Scala API 的错误?我想先在这里检查一下,然后再提出罚单,以防万一我在做傻事!
问题是 Flink 默认确实期望一行的所有字段都是非空的。这就是程序在看到外部联接操作的null
结果时失败的原因。为了接受null
值,您需要通过以下方式禁用空检查
val tableConfig = tableEnv.getConfig
tableConfig.setNullCheck(false)
或者,您必须指定允许空值的结果类型,例如指定自定义 POJO 输出类型:
table.toAppendStream[MyOutput].print()
跟
class MyOutput(var name: String, var number: Integer) {
def this() {
this(null, null)
}
override def toString: String = s"($name, $number)"
}