将来自 2 个 RDD(一个具有 unicode 数据,一个具有普通数据)的数据写入 PySpark 中的 csv 文件



我有两个RDD's

RDD1:RDD1中的数据采用 unicode 格式

[[u'a',u'b',u'c'],[u'c',u'f',u'a'],[u'ab',u'cd',u'gh']...]

RDD2:

[(10.1, 10.0), (23.0, 34.0), (45.0, 23.0),....]

两个RDDs的行数相同(但一个在每行/记录中有 2 列/元素,一个有 3 列/元素(。现在我想做的是从RDD2中获取所有元素,并从RDD1中获取2nd记录,并将它们写出到本地文件系统(不是hdfs(上的csv文件中。因此,上述示例的csv文件中的输出将是:

a,b,c,10.0
c,f,a,34.0
ab,cd,gh,23.0

我怎样才能在PySpark做到这一点?

更新:这是我当前的代码:

columns_num = [0,1,2,4,7]
rdd1 = rdd3.map(lambda row: [row[i] for i in columns_num])
rdd2 = rd.map(lambda tup: (tup[0], tup[1]+ (tup[0]/3)) if tup[0] - tup[1] >= tup[0]/3 else (tup[0],tup[1]))
with open("output.csv", "w") as fw:
    writer = csv.writer(fw)
    for (r1, r2) in izip(rdd1.toLocalIterator(), rdd2.toLocalIterator()):
        writer.writerow(r1 + tuple(r2[1:2]))

我收到错误TypeError: can only concatenate list (not "tuple") to list.如果我这样做writer.writerow(tuple(r1) + r2[1:2])那么我会收到错误UnicodeEncodeError: 'ascii' codec can't encode character u'x80' in position 16: ordinal not in range(128)`

如果本地是指驱动程序文件系统,那么您可以简单地collect或转换toLocalIterator并编写:

import csv
import sys
if sys.version_info.major == 2:
    from itertools import izip
else:
    izip = zip
rdd1 = sc.parallelize([(10.1, 10.0), (23.0, 34.0), (45.0, 23.0)])
rdd2 = sc.parallelize([("a", "b" ," c"), ("c", "f", "a"), ("ab", "cd", "gh")])
with open("output.csv", "w") as fw:
    writer = csv.writer(fw)
    for (r1, r2) in izip(rdd2.toLocalIterator(), rdd1.toLocalIterator()):
        writer.writerow(r1 + r2[1:2])

相关内容

最新更新