我有Flink of 4版本,想把它更新到11。我尝试使用StreamingFileSink
而不是不推荐使用的BucketingSink
。我的代码看起来像:
val sink = StreamingFileSink
.forBulkFormat(new Path(s"${conf.output}/$path") , AvroWriters.forReflectRecord[T](clazz) )
.withBucketCheckInterval(toMillis(config.inactiveBucketThreshold))
.withBucketAssigner(bucketAssigner)
.build()
但我在测试中对fs的写入有问题。
在avro编码器中使用StreamingFileSink.forBulkFormat
而不是StreamingFileSink.forRowFormat
有哪些优点?
你能帮我写一个第二种用法的例子吗?
请参阅以下内容"行格式或大容量格式";线程获取一些细节
简而言之,至少在1.11
版本之前,flink只为avro格式提供了BulkWriter
工厂-AvroWriterFactory
,您可以开箱即用。
要使用行格式StreamingFileSink.forRowFormat
,您需要提供自己的org.apache.flink.api.common.serialization.Encoder
接口实现,该接口将能够逐个记录地对数据进行编码并将数据附加到零件文件中。
例如,您可以查看org.apache.flink.formats.json.JsonFileSystemFormatFactory.JsonRowDataEncoder
。