为什么此配置会导致Any类型?我不能调用.build((!我的flink版本是1.10.0,scala版本是2.11链接到屏幕截图
val sink = StreamingFileSink
.forRowFormat(new Path("s3a://123"), csvEncoder)
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMinutes(5))
.withInactivityInterval(TimeUnit.MINUTES.toMinutes(5))
.withMaxPartSize(128 * 1024 * 1024)
.build()
)
.withBucketAssigner(
new BucketAssigner[UserEvent, String] {
override def getBucketId(element: UserEvent, context: BucketAssigner.Context): String = element.getType.name
override def getSerializer: SimpleVersionedSerializer[String] = new SimpleVersionedStringSerializer
}
) // this returns Any!!!
.build() // can't call .build()
问题是Scala的类型推理与StreamingFileSink
构建器使用的自类型习惯用法相结合。
作为一个快速修复,你可以插入一个演员阵容:
val sink = StreamingFileSink
.forRowFormat(new Path("s3a://123"), csvEncoder)
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMinutes(5))
.withInactivityInterval(TimeUnit.MINUTES.toMinutes(5))
.withMaxPartSize(128 * 1024 * 1024)
.build()
)
.withBucketAssigner(
new BucketAssigner[UserEvent, String] {
override def getBucketId(element: UserEvent, context: BucketAssigner.Context): String = element.getType.name
override def getSerializer: SimpleVersionedSerializer[String] = new SimpleVersionedStringSerializer
}
).asInstanceOf[StreamingFileSink.RowFormatBuilder[UserEvent, String, _]]
.build()
正确的修复需要更改Flink。您可以跟踪FLINK-16684,以便在问题得到正确解决时得到通知。
更新
Flink 1.10.1和1.11.0已修复该问题。