在Flink中从数据流到表的转换过程中,我遇到了从Scala事例类进行Schema推理的问题。我试着复制文档中给出的例子,但无法使它们发挥作用。我想知道这是否是一个bug?
我过去曾评论过一个有些相关的问题。我的解决方法不是使用case类,而是费力地定义带有返回类型注释的DataStream[Row]。
尽管如此,我还是想知道是否有可能从案例类中获得Schema推理。
我使用的是Flink 1.15.2和Scala 2.12.7。我使用的是java库,但单独安装了flink-scala
。
这是我对示例1的一个快速健康检查的实现:
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
import org.apache.flink.test.util.MiniClusterWithClientResource
import org.scalatest.BeforeAndAfter
import org.scalatest.funsuite.AnyFunSuite
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment
import java.time.Instant
class SanitySuite extends AnyFunSuite with BeforeAndAfter {
val flinkCluster = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.build
)
before {
flinkCluster.before()
}
after {
flinkCluster.after()
}
test("Verify that table conversion works as expected") {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
case class User(name: String, score: java.lang.Integer, event_time: java.time.Instant)
// create a DataStream
val dataStream = env.fromElements(
User("Alice", 4, Instant.ofEpochMilli(1000)),
User("Bob", 6, Instant.ofEpochMilli(1001)),
User("Alice", 10, Instant.ofEpochMilli(1002))
)
val table =
tableEnv.fromDataStream(
dataStream
)
table.printSchema()
}
}
根据文件,这应该导致:
(
`name` STRING,
`score` INT,
`event_time` TIMESTAMP_LTZ(9)
)
我得到的:
(
`f0` RAW('SanitySuite$User$1', '...')
)
如果我按照示例5修改我的代码,即明确定义一个镜像case类的Schema,我会得到一个错误,看起来很像是由于无法提取case类字段而导致的:
Unable to find a field named 'event_time' in the physical data type derived from the given type information for schema declaration. Make sure that the type information is not a generic raw type. Currently available fields are: [f0]
问题在于导入,您正在导入java类,并将scala类用于pojo。
使用以下作品:
PD_5