在HDFS上附加到文件的推荐方式是什么



我很难找到一种安全的方式来附加到HDFS中的文件。

我用的是一个小型3-node Hadoop cluster (CDH v.5.3.9 to be specific)。我们的进程是一个数据流水线,它是multi-threaded (8 threads),它有一个阶段,可以将分隔文本行附加到HDFS上专用目录中的文件中。我使用锁来同步线程对附加数据的缓冲写入程序的访问。

我的第一个问题是决定总体方法。

方法A是打开文件,对其进行追加,然后对追加的每一行都将其关闭。这似乎很慢,似乎会造成太多的小阻碍,或者至少我在各种帖子中看到了一些这样的情绪。

方法B是缓存写入程序,但定期刷新它们,以确保写入程序列表不会无限增长(目前,流水线处理的每个输入文件都有一个写入程序)。这似乎是一种更有效的方法,但我认为在一段时间内无论如何控制开放流可能是一个问题,尤其是对于输出文件读取器(?)

除此之外,我真正的问题有两个。我正在使用FileSystem Java Hadoop API进行附加,并且间歇性地得到这两个异常:

org.apache.hadoop.ipc.RemoteException: failed to create file /output/acme_20160524_1.txt for DFSClient_NONMAPREDUCE_271210261_1 for client XXX.XX.XXX.XX because current leaseholder is trying to recreate file.

org.apache.hadoop.ipc.RemoteException: BP-1999982165-XXX.XX.XXX.XX-1463070000410:blk_1073760252_54540 does not exist or is not under Constructionblk_1073760252_545 40{blockUCState=UNDER_RECOVERY, primaryNodeIndex=1, replicas=[ReplicaUnderConstruction[[DISK]DS-ccdf4e55-234b-4e17-955f-daaed1afdd92:NORMAL|RBW], ReplicaUnderConst ruction[[DISK]DS-1f66db61-759f-4c5d-bb3b-f78c260e338f:NORMAL|RBW]]}

有人对此有什么想法吗?

对于第一个问题,我尝试了使用本文中讨论的逻辑,但似乎没有帮助。

如果适用的话,我还对dfs.support.append属性的角色感兴趣。

我获取文件系统的代码:

userGroupInfo = UserGroupInformation.createRemoteUser("hdfs"); Configuration conf = new Configuration();
conf.set(key1, val1);
...
conf.set(keyN, valN);
fileSystem = userGroupInfo.doAs(new PrivilegedExceptionAction<FileSystem>() { 
public FileSystem run() throws Exception { 
return FileSystem.get(conf);
}
});

我获取OutputStream的代码:

org.apache.hadoop.fs.path.Path file = ...
public OutputStream getOutputStream(boolean append) throws IOException {   
OutputStream os = null;
synchronized (file) { 
if (isFile()) {
os = (append) ? fs.append(file) : fs.create(file, true);
} else if (append) {
// Create the file first, to avoid "failed to append to non-existent file" exception
FSDataOutputStream dos = fs.create(file);
dos.close();
// or, this can be: fs.createNewFile(file);
os = fs.append(file);
}
// Creating a new file
else { 
os = fs.create(file);
}
}
return os;
} 

我在使用CDH 5.3/HDFS 2.5.0时得到了文件附加。到目前为止,我的结论如下:

  • 无论我们是通过HDFS API FileSystem的同一个实例还是不同的实例写入数据,都不能让一个专用线程对每个文件进行追加,也不能让多个线程对多个文件进行写入
  • 无法刷新(即关闭和重新打开)写入程序;它们必须保持开放
  • 最后一项偶尔会导致相对罕见的ClosedChannelException,它似乎是可以恢复的(通过重试追加)
  • 我们使用一个带有阻塞队列的单线程执行器服务(一个用于附加到所有文件的队列);每个文件一个写入程序,写入程序保持打开状态(直到处理结束时关闭)
  • 当我们升级到更新到5.3的CDH时,我们会想重新审视一下这一点,看看什么线程策略是有意义的:一个唯一的线程,每个文件一个线程,多个线程写入多个文件。此外,我们还想看看是否可以/需要定期关闭和重新打开作家
  • 此外,我还看到了以下错误,并通过在客户端将"dfs.client.block.write.replace-datanode-on-failure.policy"设置为"NEVER"来消除该错误
java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being available to try. (Nodes: current=[XXX.XX.XXX.XX:50010, XXX.XX.XXX.XX:50010], original=[XXX.XX.XXX.XX:50010, XXX.XX.XXX.XX:50010]). The current failed datanode replacement policy is DEFAULT, and a client may configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration.
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.findNewDatanode(DFSOutputStream.java:969) ~[hadoop-hdfs-2.5.0.jar:?]
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1035) ~[hadoop-hdfs-2.5.0.jar:?]
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1184) ~[hadoop-hdfs-2.5.0.jar:?]
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:532) ~[hadoop-hdfs-2.5.0.jar:?]