一次保存所有字典键



>我有一个具有这种结构的rdd:

[ 
    ('Washington', 
      [
        {'age': 15, 'name': 'John', 'extra_info1': 'data'},
        {'age': 25 , 'name': 'David', 'extra_info1': 'data'} 
      ]),
    ('New York',
      [
        {'age' 50, 'name': 'Mike', 'extra_info2': 'blob'},
        {'age' 24, 'name': 'Fred', 'extra_info2': 'blob3'}
      ])
]

正如你所看到的,我有城市的钥匙,然后是里面人的字典列表。 在所有键中,字典中都有一些共享键,如年龄和姓名,但每个词典也有唯一的键。

现在要将其输出为 csv,我迭代 rdd 的每个键,将字典列表转换为 spark.sql.Row 的 rdd 并从 rdd 创建数据帧,然后我使用 com.databricks.spark.csv 将每个数据帧保存到 hdfs 中的 csv。

我是这样做的:

for key in rdd.keys().toLocalIterator():
    city_rdd = rdd.filter(lambda k: k[0] == key)
    city_rdd = city_rdd.map(lambda kv: kv[1]) # return only data without key
    city_rdd_rows = city.rdd.map(lambda r: spark.sql.Row(r))
    city_df = city_rdd.toDF()
    # save the city_df to csv with com.databricks.spark.csv.. i dont have the snippet here

问题是我有很多元组键,比如华盛顿和纽约,每次保存文件需要 1-2 分钟,而不是并行保存所有 csv 并节省时间。

我已经阅读了这篇文章,但是当我无法使其与csv输出一起使用时,当我尝试json输出时,我看到jsons密钥只是我的rdd中的"age"和"name"等共享密钥。

我该怎么办?

您可以编写一个 Python 函数,将 csv 文件写入所有工作人员都可以到达的网络位置,然后使用 .map() 函数像这样执行它:

import csv
def csv_writer(data):
    city, mydicts = data
    open('//network/location/{}.csv'.format(city), 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        for single_dict in mydicts:
            for key, value in single_dict.items():
                 writer.writerow([key, value])

city_rdd.map(csv_writer).count() #.count() is needed to inniate the action

相关内容

  • 没有找到相关文章

最新更新