>我有数据帧,数据如下:
channel eventId1 eventId2 eventTs eventTs2 serialNumber someCode
Web-DTB akefTEdZhXt8EqzLKXNt1Wjg akTEdZhXt8EqzLKXNt1Wjg 1545502751154 1545502766731 4 rfs
Web-DTB 3ycLHHrbEkBJ.piYNyI7u55w 3ycLHHEkBJ.piYNyI7u55w 1545502766247 1545502767800 4 njs
Web-DTB 3ycL4rHHEkBJ.piYNyI7u55w 3ycLHHEkBJ.piYNyI7u55w 1545502766247 1545502767800 4 null
我需要将此数据保存到 S3 路径,如下所示:
s3://test/data/ABC/hb/eventTs/[eventTs]/uploadTime_[eventTs2]/*.json.gz
我该如何继续此操作,因为我需要从分区中提取数据以写入 S3 路径:(s3 路径是数据帧中存在的事件 Ts 和 eventTs2 的函数)
df.write.partitionBy("eventTs","eventTs2").format("json").save("s3://test/data/ABC/hb????")
我想我可以遍历数据帧中的每一行,提取路径并保存到 S3,但不想这样做。
有没有办法按事件T和事件Ts2上的数据帧分组,然后将数据帧保存到完整的S3路径?还有比这更理想的方法吗?
支持像Hive中的分区。如果 eventTs(eventTs2)的不同元素数量较少,则分区将是解决此问题的好方法。
查看 scala 文档以获取有关分区的更多信息。
用法示例:
val someDF = Seq((1, "bat", "marvel"), (2, "mouse", "disney"), (3, "horse", "animal"), (1, "batman", "marvel"), (2, "tom", "disney") ).toDF("id", "name", "place")
someDF.write.partitionBy("id", "name").orc("/tmp/somedf")
如果在"id"和"name"上使用paritionBy写入数据帧,则将创建以下目录结构。
/tmp/somedf/id=1/name=bat
/tmp/somedf/id=1/name=batman
/tmp/somedf/id=2/name=mouse
/tmp/somedf/id=2/name=tom
/tmp/somedf/id=3/name=horse
第一个和第二个分区成为目录,所有id等于1且name为bat的行都将保存在目录结构下/tmp/somedf/id=1/name=bat
,分区中定义的分区顺序决定了目录的顺序。
在您的情况下,分区将位于 eventTs 和 eventTS2 上。
val someDF = Seq(
("Web-DTB","akefTEdZhXt8EqzLKXNt1Wjg","akTEdZhXt8EqzLKXNt1Wjg","1545502751154","1545502766731",4,"rfs"),
("Web-DTB","3ycLHHrbEkBJ.piYNyI7u55w","3ycLHHEkBJ.piYNyI7u55w","1545502766247","1545502767800",4,"njs"),
("Web-DTB","3ycL4rHHEkBJ.piYNyI7u55w","3ycLHHEkBJ.piYNyI7u55w","1545502766247","1545502767800",4,"null"))
.toDF("channel" , "eventId1", "eventId2", "eventTs", "eventTs2", "serialNumber", "someCode")
someDF.write("eventTs", "eventTs2").orc("/tmp/someDF")
创建目录结构,如下所示。
/tmp/someDF/eventTs=1545502766247/eventTs2=1545502767800
/tmp/someDF/eventTs=1545502751154/eventTs2=1545502766731