Flink 1.6 分桶接收器 HDFS 文件卡在 .进行中



我正在将 Kafka 数据流写入 HDFS 路径中的存储桶接收器。Kafka 给出字符串数据。使用 FlinkKafkaConsumer010 从 Kafka 消费

-rw-r--r--   3 ubuntu supergroup    4097694 2018-10-19 19:16 /streaming/2018-10-19--19/_part-0-1.in-progress
-rw-r--r--   3 ubuntu supergroup    3890083 2018-10-19 19:16 /streaming/2018-10-19--19/_part-1-1.in-progress
-rw-r--r--   3 ubuntu supergroup    3910767 2018-10-19 19:16 /streaming/2018-10-19--19/_part-2-1.in-progress
-rw-r--r--   3 ubuntu supergroup    4053052 2018-10-19 19:16 /streaming/2018-10-19--19/_part-3-1.in-progress

仅当我使用某些映射函数动态操作流数据时,才会发生这种情况。如果我直接将流写入 HDFS,它工作正常。知道为什么会发生这种情况吗?我正在使用 Flink 1.6.1、Hadoop 3.1.1 和 Oracle JDK1.8

这个问题有点晚了,但我也遇到了类似的问题。 我有一个案例类地址

case class Address(val i: Int)

例如,我从带有地址编号的集合中读取源

env.fromCollection(Seq(new Address(...), ...)) 
...
val customAvroFileSink = StreamingFileSink
.forBulkFormat(
new Path("/tmp/data/"),
ParquetAvroWriters.forReflectRecord(classOf[Address]))
.build()
... 
xxx.addSink(customAvroFileSink)

启用检查点后,我的镶木地板文件也将以正在进行

我发现 Flink 在触发检查点之前完成了该过程,所以我的结果从未完全刷新到磁盘。将检查点间隔更改为较小的数字后,镶木地板不再进行中。

此方案通常在禁用检查点时发生。

您可以在使用映射功能运行作业时检查检查点设置吗?看起来您已经为直接写入 HDFS 的作业启用了检查点。

我遇到了类似的问题,启用检查点并将状态后端从默认MemoryStateBackend更改为FsStateBackend工作。在我的例子中,检查点失败MemoryStateBackend因为maxStateSize太小,以至于其中一个操作的状态无法放入内存中。

StateBackend stateBackend = new FsStateBackend("file:///home/ubuntu/flink_state_backend");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
.enableCheckpointing(Duration.ofSeconds(60).toMillis())
.setStateBackend(stateBackend);

相关内容

  • 没有找到相关文章

最新更新