你好,我对动态文件目的地API感到非常困惑,并且没有文档,所以我在这里。
情况是我有一个pcollection,它包含属于不同分区的事件。我想将它们分开,并将它们写入GCS中的不同文件夹。
这就是我所拥有的。
动态目标对象:
class GCSDestinationString(prefix: String) extends DynamicDestinations[Event, String, String] {
override def getDestination(element: Event): String = {
element.partition //this returns a string which is a gcs folder path
}
override def getFilenamePolicy(destination: String): FileBasedSink.FilenamePolicy = {
println(destination)
val overallPrefix = s"$prefix/$destination/part-"
DefaultFilenamePolicy.fromStandardParameters(
ValueProvider.StaticValueProvider.of(
FileSystems.matchNewResource(overallPrefix, false)),
null, ".jsonl", true)
}
override def formatRecord(record: Event): String = {
implicit val f = DefaultFormats
write(record.toDataLakeFormat())
}
override def getDefaultDestination: String = "default"
}
我相信这是正确的逻辑,我问每个元素的目标分区是什么,然后将其传递到getfilenamepolicy中,然后从那里构建一个文件名。要格式化记录,我只将其转换为JSON。
问题是将其与textio集成在一起,我尝试了此
TextIO.
write()
.withWindowedWrites()
.withTempDirectory(tempDir)
.to(new GCSDestinationString("gcs://bucket"))
,但它要求源类型是字符串,从技术上讲,这可以起作用,但我必须多次进行挑选。我在文本中找到了文本IO动态目的地
Often this is used in conjunction with {@link TextIO#writeCustomType}, which allows your {@link DynamicDestinations} object to examine the input type and takes a format function to convert that type to a string for writing.
所以让我们尝试
TextIO
.writeCustomType[Event]()
.withWindowedWrites()
.withTempDirectory(tempDir)
.to(new GCSDestinationString("gcs://bucket"))
这仍然没有编译,因为WriteCustomType内部返回TypedWrite<UserT, Void>
,并且它的影响是要求我的动态目标对象的第二个类型参数无效。显然,我要求它是字符串,或者至少是void
我显然缺少某物
哦,这很尴尬。事实证明,writeCustomType().to(DynamicDestinations)
没有进行测试,我们没有注意到它,但是它在类型签名中具有错别字。pr https://github.com/apache/beam/pull/4319正在审查。您仍然需要2.3.0-snapshot才能拾取它,在这种情况下,我仍然建议您仅使用FileIO.write()
。
它似乎在Scala中没有编译,但是我能够在挖掘
后获得类似API的行为 var outputTransform =
TextIO.
writeCustomType[T]()
.withFormatFunction(outputFormatter)
.withNumShards(shards)
.withTempDirectory(tempDir)
.withCompression(compression)
if (windowedWrites) {
outputTransform = outputTransform.withWindowedWrites()
}
outputTransform.to(outputFileNamePolicyMapping, emptryDestination)
,从t到字符串的输出格式和outputfilenamepolicymapping是从t到defaultfilenamepolicy.params
遵循@jkff建议在apache beam邮件列表上我设法将其编译和工作方式:
:val write = TextIO.writeCustomType[Event].asInstanceOf[TextIO.TypedWrite[Event, String]]
.to(new MyDynamicDestinations(baseDir))
尽管这样做之后,我意识到使用DefaultFilenamePolicy.Params
而不是String
作为目标输出更为方便。让我知道您是否想要有关此一点的更多信息。