Flink流媒体应用程序生成许多进程中的空输出文件



我有以下流媒体应用程序,它从Kafka主题中读取Protobuf消息,并将它们写入FileSystem镶木地板水槽:

class ProtoDeserializer extends DeserializationSchema[User] {
override def getProducedType: TypeInformation[User] = TypeInformation.of(classOf[User])
override def deserialize(message: Array[Byte]): User =
User.parseFrom(message.slice(6, message.length))
override def isEndOfStream(nextElement: User): Boolean = false
}
object StreamingKafkaProtoToParquetLocalFs {
private val brokers = "localhost:9092"
private val topic = "test-topic-proto"
private val consumerGroupId = "test-consumer-proto"
private val targetPath = "file:///my/path"

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
env.enableCheckpointing(TimeUnit.MINUTES.toMillis(1))
env.getCheckpointConfig.setCheckpointStorage(s"$targetPath/checkpoints")
env.getConfig.registerTypeWithKryoSerializer(classOf[User], classOf[ProtobufSerializer])
val source = KafkaSource.builder[User]
.setBootstrapServers(brokers)
.setTopics(topic)
.setGroupId(consumerGroupId)
.setValueOnlyDeserializer(new ProtoDeserializer)
.setStartingOffsets(OffsetsInitializer.earliest())
.build
val input: DataStream[User] = env.fromSource(source, WatermarkStrategy.noWatermarks[User], "KafKaTable")
val sink: StreamingFileSink[User] = StreamingFileSink
.forBulkFormat(new Path(s"$targetPath/data"), ParquetProtoWriters.forType(classOf[User]))
.build()
input.addSink(sink)
env.execute()
}
}

当我执行程序时,我看到写入目标路径的所有输出文件都是空的(0大小(和inprogress,尽管我启用了检查点。

需要注意的是,主题不是空的,当我将接收器更改为print()时,消息将正确打印。

我错过了什么?为什么print和镶木地板水槽表现不同?

显式添加对Apache Parquet Protobuf的依赖似乎解决了这个问题。

对于Maven用户,在pom.xml:中添加了以下依赖项

<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-protobuf</artifactId>
<version>1.11.1</version>
</dependency>

您似乎使用了最新版本的Flink,所以请尝试进行以下更改:

val sink: FileSink[User] = FileSink
.forBulkFormat(new Path(s"$targetPath/data"), 
ParquetProtoWriters.forType(classOf[User]))
.build()
input.sinkTo(sink)

StreamingFileSink已过时,正在被FileSink取代。

最新更新