Flink:java.lang.UnsupportedOperationException:无法覆盖KeyedStrea



我在运行我的小 flink 程序时遇到以下异常。该应用程序有两个数据流来自同一个模拟源。它具有广播状态。我写这个是为了做一些性能测试,但给了我例外

Caused by: java.lang.UnsupportedOperationException: Cannot override partitioning for KeyedStream.
at org.apache.flink.streaming.api.datastream.KeyedStream.setConnectionType(KeyedStream.java:251)
at org.apache.flink.streaming.api.datastream.DataStream.broadcast(DataStream.java:429)
at org.apache.flink.streaming.api.scala.DataStream.broadcast(DataStream.scala:495)

我的代码:

val testStream: DataStream[Tuple2[String, String]] = env
.addSource(
new MockKafkaSource
)
.filter(x => !x._1.equals("x"))
.map(x => x)
.uid("test stream 1")
val testStream2: DataStream[Tuple2[String, String]] = env
.addSource(
new MockKafkaSource
)
.map(x => x)
.keyBy(x => x._1)
.uid("test stream 2")
lazy val testStateDescriptor =
new MapStateDescriptor("testState", classOf[String], classOf[Tuple2[String, String]])
val testBroadcastStream = testStream.broadcast(testStateDescriptor)
val broadcastOutStream: DataStream[Tuple2[String, String]] =
testStream2
.connect(testBroadcastStream)
.process(new StateProcess)
broadcastOutStream.print()

此行上发生的异常:

val testBroadcastStream = testStream.broadcast(testStateDescriptor)

我的问题是我在键控流上调用了uid方法以进行testStream2.我不得不将uid移动到地图之后,然后键入流。

相关内容

  • 没有找到相关文章

最新更新