如何在Java中使用DataFlow文本IO动态目的地



你好,我对动态文件目的地API感到非常困惑,并且没有文档,所以我在这里。

情况是我有一个pcollection,它包含属于不同分区的事件。我想将它们分开,并将它们写入GCS中的不同文件夹。

这就是我所拥有的。

动态目标对象:

  class GCSDestinationString(prefix: String) extends DynamicDestinations[Event, String, String] {
    override def getDestination(element: Event): String = {
      element.partition //this returns a string which is a gcs folder path
    }
    override def getFilenamePolicy(destination: String): FileBasedSink.FilenamePolicy = {
      println(destination)
      val overallPrefix = s"$prefix/$destination/part-"
      DefaultFilenamePolicy.fromStandardParameters(
        ValueProvider.StaticValueProvider.of(
          FileSystems.matchNewResource(overallPrefix, false)),
        null, ".jsonl", true)
    }
    override def formatRecord(record: Event): String = {
      implicit val f = DefaultFormats
      write(record.toDataLakeFormat())
    }
    override def getDefaultDestination: String = "default"
  }

我相信这是正确的逻辑,我问每个元素的目标分区是什么,然后将其传递到getfilenamepolicy中,然后从那里构建一个文件名。要格式化记录,我只将其转换为JSON。

问题是将其与textio集成在一起,我尝试了此

TextIO.
  write()
  .withWindowedWrites()
  .withTempDirectory(tempDir)
  .to(new GCSDestinationString("gcs://bucket"))

,但它要求源类型是字符串,从技术上讲,这可以起作用,但我必须多次进行挑选。我在文本中找到了文本IO动态目的地

Often this is used in conjunction with {@link TextIO#writeCustomType}, which allows your {@link DynamicDestinations} object to examine the input type and takes a format function to convert that type to a string for writing.

所以让我们尝试

  TextIO
    .writeCustomType[Event]()
    .withWindowedWrites()
    .withTempDirectory(tempDir)
    .to(new GCSDestinationString("gcs://bucket"))

这仍然没有编译,因为WriteCustomType内部返回TypedWrite<UserT, Void>,并且它的影响是要求我的动态目标对象的第二个类型参数无效。显然,我要求它是字符串,或者至少是void

以外的东西

我显然缺少某物

哦,这很尴尬。事实证明,writeCustomType().to(DynamicDestinations)没有进行测试,我们没有注意到它,但是它在类型签名中具有错别字。pr https://github.com/apache/beam/pull/4319正在审查。您仍然需要2.3.0-snapshot才能拾取它,在这种情况下,我仍然建议您仅使用FileIO.write()

它似乎在Scala中没有编译,但是我能够在挖掘

后获得类似API的行为
 var outputTransform =
        TextIO.
          writeCustomType[T]()
          .withFormatFunction(outputFormatter)
          .withNumShards(shards)
          .withTempDirectory(tempDir)
          .withCompression(compression)
      if (windowedWrites) {
        outputTransform = outputTransform.withWindowedWrites()
      }
      outputTransform.to(outputFileNamePolicyMapping, emptryDestination)

,从t到字符串的输出格式和outputfilenamepolicymapping是从t到defaultfilenamepolicy.params

遵循@jkff建议在apache beam邮件列表上我设法将其编译和工作方式:

val write = TextIO.writeCustomType[Event].asInstanceOf[TextIO.TypedWrite[Event, String]]
    .to(new MyDynamicDestinations(baseDir))

尽管这样做之后,我意识到使用DefaultFilenamePolicy.Params而不是String作为目标输出更为方便。让我知道您是否想要有关此一点的更多信息。

最新更新