>我有一个具有这种结构的rdd:
[
('Washington',
[
{'age': 15, 'name': 'John', 'extra_info1': 'data'},
{'age': 25 , 'name': 'David', 'extra_info1': 'data'}
]),
('New York',
[
{'age' 50, 'name': 'Mike', 'extra_info2': 'blob'},
{'age' 24, 'name': 'Fred', 'extra_info2': 'blob3'}
])
]
正如你所看到的,我有城市的钥匙,然后是里面人的字典列表。 在所有键中,字典中都有一些共享键,如年龄和姓名,但每个词典也有唯一的键。
现在要将其输出为 csv,我迭代 rdd 的每个键,将字典列表转换为 spark.sql.Row 的 rdd 并从 rdd 创建数据帧,然后我使用 com.databricks.spark.csv 将每个数据帧保存到 hdfs 中的 csv。
我是这样做的:
for key in rdd.keys().toLocalIterator():
city_rdd = rdd.filter(lambda k: k[0] == key)
city_rdd = city_rdd.map(lambda kv: kv[1]) # return only data without key
city_rdd_rows = city.rdd.map(lambda r: spark.sql.Row(r))
city_df = city_rdd.toDF()
# save the city_df to csv with com.databricks.spark.csv.. i dont have the snippet here
问题是我有很多元组键,比如华盛顿和纽约,每次保存文件需要 1-2 分钟,而不是并行保存所有 csv 并节省时间。
我已经阅读了这篇文章,但是当我无法使其与csv输出一起使用时,当我尝试json输出时,我看到jsons密钥只是我的rdd中的"age"和"name"等共享密钥。
我该怎么办?
您可以编写一个 Python 函数,将 csv 文件写入所有工作人员都可以到达的网络位置,然后使用 .map()
函数像这样执行它:
import csv
def csv_writer(data):
city, mydicts = data
open('//network/location/{}.csv'.format(city), 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
for single_dict in mydicts:
for key, value in single_dict.items():
writer.writerow([key, value])
city_rdd.map(csv_writer).count() #.count() is needed to inniate the action