如何在 flink kafka stream 中使用 sql?



我已经从postgresql DB加载了一个规则表作为Flink表,然后读取kafka msg并按这些规则对msg进行分类。 代码是这样的

val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.enableCheckpointing(5000)
val stenv=StreamTableEnvironment.create(senv)
val streamsource=senv.createInput(inputFormat)
stenv.registerDataStream("rules",streamsource)
val properties = new Properties()
properties.setProperty("bootstrap.servers", KAFKA_BROKER)
properties.setProperty("zookeeper.connect", ZOOKEEPER_HOST)
properties.setProperty("group.id", TRANSACTION_GROUP)
val fkp = new FlinkKafkaProducer010[String](TOPIC1, new SimpleStringSchema(), properties)
val fkc = new FlinkKafkaConsumer010[String](TOPIC, new SimpleStringSchema(), properties)
val stream = senv.addSource(fkc).setParallelism(3)
val jsons = stream.map {
{
r => {
val sub = JSON.parseObject(r.toString)
val value = sub.getDouble("value")
val time = sub.getLong("time")
val tag = sub.getString("name")
val error = sub.getString("error")
val t = stenv.sqlQuery("select * from rules").where("nodeid=" + tag) //error is here
//todo
}
} 

错误是这个

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
at org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:686)
at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:1143)
at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:617)
at cettest$.main(cettest.scala:63)
at cettest.main(cettest.scala)
Caused by: java.io.NotSerializableException: org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:406)
... 7 more

我已经尝试了很多方法来解决问题,但失败了!

欢迎来到堆栈溢出!如果您可以列出到目前为止的尝试,那将很有帮助,但是您的问题的解决方案似乎相当简单 - 看起来StreamTableEnvironmentImpl没有扩展可序列化特征:https://www.oreilly.com/library/view/scala-cookbook/9781449340292/ch12s08.html

但是,使用 Flink 的 @Internal 类似乎不对。我宁愿创建自己的可序列化类,或者很可能创建一个 case 类,默认情况下它是可序列化的。

希望对您有所帮助!

相关内容

  • 没有找到相关文章