Flink的JDBC接收器失败,出现不可序列化的错误



我在下面https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/jdbc.html使用mysql数据库作为Flink的接收器。代码编译成功,但在Flink集群中执行作业失败,出现

The program finished with the following exception:
The implementation of the AbstractJdbcOutputFormat is not serializable. The object probably contains or references non serializable fields.
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1899)
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:189)
org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1296)
org.apache.flink.streaming.api.scala.DataStream.addSink(DataStream.scala:1131)
Aggregator.Aggregator$.main(Aggregator.scala:81)

以下是代码的相关部分:

object Aggregator {
@throws[Exception]
def main(args: Array[String]): Unit = {
[...]
val counts = stream.map { x => (
x.get("value").get("id").asInt(),
x.get("value").get("kpi").asDouble()
)}
.keyBy(0)
.timeWindow(Time.seconds(60))
.sum(1)
counts.print()
val statementBuilder: JdbcStatementBuilder[(Int, Double)] = (ps: PreparedStatement, t: (Int, Double)) => {
ps.setInt(1, t._1);
ps.setDouble(2, t._2);
};
val connection = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("mysql.Driver")
.withPassword("XXX")
.withUrl("jdbc:mysql://<DB_HOST>:3306/<DB_NAME>")
.withUsername("<USERNAME>")
.build();
val jdbcSink = JdbcSink.sink(
"INSERT INTO table (id, kpi) VALUES (?, ?)",
statementBuilder,
connection);
counts.addSink(jdbcSink)

env.execute("Aggregator")
}
}

我不确定代码的哪一部分有问题,也不确定如何调试。不幸的是,我在Scala中也找不到JDBC接收器的参考实现。感谢您的帮助!

对我有效的是显式创建JdbcStatementBuilder。类似于:

val statementBuilder: JdbcStatementBuilder[(Int, Double)] =
new JdbcStatementBuilder[(Int, Double)] {
override def accept(ps: PreparedStatement, t: (Int, Double)): Unit = {
ps.setInt(1, t._1)
ps.setDouble(2, t._2)
}
}

相关内容

  • 没有找到相关文章