我的pyspark过程的输出部分大小不均匀,但可以预测,具有n**2模式(0,1,2,4,8,16等)。这是我的过程:
我从Google BigQuery加载数据,如下所示:
dConf = {
"mapred.bq.project.id": project_id,
"mapred.bq.gcs.bucket": bucket,
"mapred.bq.input.project.id": project_id,
"mapred.bq.input.dataset.id":dataset_id,
"mapred.bq.input.table.id": table_id
}
rdd_dataset_raw = sc.newAPIHadoopRDD(
"com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat",
"org.apache.hadoop.io.LongWritable",
"com.google.gson.JsonObject",
conf=dConf
)
其输出如下(rdd_dataset_raw.take(2)
):
[(0, u'{"group_id":"1","pertubations":"Current Affairs,Sport,Technology"}'),
(67, u'{"group_id":"2","pertubations":"Current Affairs,Sport,Celeb Gossip"}')]
一些琐碎的处理,带有重新分区:
rdd_dataset = (
rdd_dataset_raw
.repartition(nr_partitions)
.map(lambda t, json=json: json.loads(t[1]))
)
它看起来像这样:
[{u'group_id': u'1', u'pertubations': u'Current Affairs,Sport,Technology'},
{u'group_id': u'2', u'pertubations': u'Current Affairs,Sport,Celeb Gossip'}]
当我用将RDD保存到谷歌存储时
rdd_dataset_raw.saveAsTextFile("gs://bucket/directory")
这将创建nr_partitions
零件文件。
但是,这些零件文件的大小并不均匀。它们在n**2
中增加,其中n是零件文件号。换句话说,
part-00000
包含0行part-00001
包含1行part-00002
包含2行part-00003
包含4行part-00004
包含8行
等
其中大多数也几乎立即完成,因为后面的部分会耗尽内存。
发生了什么事!?如何使分区负载均衡?
用partitionBy
:替换repartition
很简单
rdd_dataset = (
rdd_dataset_raw
.partitionBy(nr_partitions)
.map(lambda t, json=json: json.loads(t[1]))
)
请注意,这需要尽早完成。传递一个未分区的rdd,然后分区后来中断。
单据