使用avro编码器的BulkFormat与forRowFormat的StreamingFileSink



我有Flink of 4版本,想把它更新到11。我尝试使用StreamingFileSink而不是不推荐使用的BucketingSink。我的代码看起来像:

val sink = StreamingFileSink
.forBulkFormat(new Path(s"${conf.output}/$path") , AvroWriters.forReflectRecord[T](clazz) )
.withBucketCheckInterval(toMillis(config.inactiveBucketThreshold))
.withBucketAssigner(bucketAssigner)
.build()

但我在测试中对fs的写入有问题。

在avro编码器中使用StreamingFileSink.forBulkFormat而不是StreamingFileSink.forRowFormat有哪些优点?

你能帮我写一个第二种用法的例子吗?

请参阅以下内容"行格式或大容量格式";线程获取一些细节

简而言之,至少在1.11版本之前,flink只为avro格式提供了BulkWriter工厂-AvroWriterFactory,您可以开箱即用。

要使用行格式StreamingFileSink.forRowFormat,您需要提供自己的org.apache.flink.api.common.serialization.Encoder接口实现,该接口将能够逐个记录地对数据进行编码并将数据附加到零件文件中。

例如,您可以查看org.apache.flink.formats.json.JsonFileSystemFormatFactory.JsonRowDataEncoder

相关内容

  • 没有找到相关文章

最新更新