当使用带有buffer-flush
选项的upsert-kafka
接收器时,我得到了以下错误,而在没有buffer-flush
选项的情况下同样可以正常工作。
Exception in thread "main" java.lang.IllegalStateException: There is no the LegacySinkTransformation.
at org.apache.flink.streaming.api.datastream.DataStreamSink.getTransformation(DataStreamSink.java:71)
at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.applySinkProvider(CommonExecSink.java:294)
at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.createSinkTransformation(CommonExecSink.java:145)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:140)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:71)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:752)
at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:124)
at com.company.flink.FlinkJob.main(FlinkJob.java:260)
https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/upsert-kafka/#sink-缓冲区刷新最大行数
这是Flink 1.14中的一个关键错误,可能会在下一个补丁版本中修复。更多信息可以在这里找到:
https://issues.apache.org/jira/browse/FLINK-24596
解决方法是使用最新的1.13版本或暂时禁用缓冲。