我有一个val dataset = Dataset[FeedData]
,其中FeedData
类似于case class FeedData(feed: String, data: XYZ)
。
我想避免对文件进行后处理,所以我决定调用dataset.repartition($"feed").json("s3a://...")
,以便每个feed
最终都位于不同的文件中。问题是这些文件仍然按照part-XXXX
行命名,所以我无法轻松地为给定的提要挑选出相关文件,如果没有 a( 打开它们以检查内部feed
的值,或 b( 后处理文件以使其更友好。
我希望文件看起来像part-XXXX-{feed}
而不是part-XXXX
是否可以根据用于对数据集进行分区的列feed
的值动态命名分区文件?
背景:
我找到了这个答案,其中提到了一个saveAsNewAPIHadoopFile()
方法,我可以在其中为我自己的文件命名实现扩展一些相关的类。
任何人都可以帮助我理解这种方法,如何从Dataset
访问它,并告诉我是否可以将所需的信息(feed
(投影到我的实现中以动态命名分区?
我试图以错误的方式做到这一点:
dataset.repartition($"colName").write.format("json").save(path)
正确的方法是:
dataset.write.partitionBy("colName").format("json").save(path)
不同之处在于您应该在.write
之后调用.partitionBy
。生成的目录如下所示:colName=value/part-XXXX
。
有关更多信息,请参阅此处。