fileoutputwwriter生成重复的tmp文件



我有一个Apache Apex应用程序使用Kafka日志并将其写入HDFS。

DAG很简单,有一个Kafka消费者(20个分区,2gb内存用于操作符)通过流连接到"MyWriter extends AbstractFileOutputOperator"。

问题:1. 我一直看到Writer多次重复地编写具有相同大小和相同数据的.tmp文件。我已经尝试增加写入操作符内存,增加了写入器的分区数量等。这个问题仍然在发生。

我尝试添加/删除requestFinalize到MyWriter。还是同样的问题。

 @Override
    public void endWindow()
    {
        if (null != fileName) {
            requestFinalize(fileName);
        }
        super.endWindow();
    }

这是我的properties.xml的一个子集

<property>
    <name>dt.attr.STREAMING_WINDOW_SIZE_MILLIS</name>
    <value>1000</value>
  </property>
  <property>
    <name>dt.application.myapp.operator.*.attr.APPLICATION_WINDOW_COUNT</name>
    <value>60</value>
  </property>
  <property>
    <name>dt.application.myapp.operator.*.attr.CHECKPOINT_WINDOW_COUNT</name>
     <value>60</value>
  </property>
 <property>
        <name>dt.application.myapp.operator.myWriter.attr.PARTITIONER</name>
        <value>com.datatorrent.common.partitioner.StatelessPartitioner:20</value>
    </property>
  <property>
    <name>dt.application.myapp.operator.myWriter.prop.maxLength</name>
    <value>1000000000</value> <!-- 1 GB File -->
  </property>

这是我能够从dt.log中获得的操作符的堆栈跟踪:操作符可能被重新部署在不同的容器中,抛出此异常并继续写入重复的文件。

 java.lang.RuntimeException: java.io.FileNotFoundException: File does not exist: /kafkaconsumetest/inventoryCount/nested/trial2/1471489200000_1471489786800_161.0.1471489802786.tmp
        at com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:418)
        at com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:112)
        at com.datatorrent.stram.engine.Node.setup(Node.java:187)
        at com.datatorrent.stram.engine.StreamingContainer.setupNode(StreamingContainer.java:1309)
        at com.datatorrent.stram.engine.StreamingContainer.access$100(StreamingContainer.java:130)
        at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1388)
    Caused by: java.io.FileNotFoundException: File does not exist: /kafkaconsumetest/inventoryCount/nested/trial2/1471489200000_1471489786800_161.0.1471489802786.tmp
        at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1219)
        at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1211)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1211)
        at com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:411)
        ... 5 more
2016-08-17 22:17:01,108 INFO com.datatorrent.stram.engine.StreamingContainer: Undeploy request: [161, 177]
2016-08-17 22:17:01,116 INFO com.datatorrent.stram.engine.StreamingContainer: Undeploy complete.
2016-08-17 22:17:02,121 INFO com.datatorrent.stram.engine.StreamingContainer: Waiting for pending request.
2016-08-17 22:17:02,625 INFO com.datatorrent.stram.engine.StreamingContainer: Waiting for pending request.
2016-08-17 22:17:03,129 INFO com.datatorrent.stram.engine.StreamingContainer: Waiting for pending request.

基操作符的代码在下面的链接中,并在评论:https://github.com/apache/apex-malhar/blob/master/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java

通过设置最大文件大小为1GB,您将自动启用滚动文件;相关字段为:

protected Long maxLength = Long.MAX_VALUE;
protected transient boolean rollingFile = false;

如果前者的值小于Long.MAX_VALUE的默认值,则在setup()方法中将后者设置为true。

当启用滚动文件时,文件结束是自动完成的,所以你不应该调用requestFinalize()

其次,在您的MyWriter类中,删除endWindow()覆盖并确保您创建了一个所需的文件名,该文件名包括setup()方法中的操作符id,并在getFileName()覆盖中返回该文件名;这确保了多个分区不会相互踩到。例如:

@NotNull
private String fileName;           // current base file name
private transient String fName;    // per partition file name
@Override
public void setup(Context.OperatorContext context)
{
  // create file name for this partition by appending the operator id to
  // the base name
  //
  long id = context.getId();
  fName = fileName + "_p" + id;
  super.setup(context);
  LOG.debug("Leaving setup, fName = {}, id = {}", fName, id);
}
@Override
protected String getFileName(Long[] tuple)
{
  return fName;
}

文件基名(上面代码中的fileName)可以直接在代码中设置,也可以从XML文件中的属性初始化(您还需要为它添加getter和setter)。

你可以在下面看到这种用法的例子:https://github.com/DataTorrent/examples/tree/master/tutorials/fileOutput

几个额外的建议:

  1. 将分区计数设置为1(或注释掉设置PARTITIONER属性的XML),并确保一切按预期工作。这将消除与分区无关的任何问题。如果可能的话,还可以将最大文件大小减小到2K或4K,以便于测试。
  2. 一旦单分区情况起作用,将分区数量增加到2。如果这个有效,任意更大的数字(在合理范围内)也应该有效。

相关内容

  • 没有找到相关文章

最新更新