RDD Pipe操作将每一行转换为字符串.如何转换回行



我使用PySpark将RDD管道输出到外部进程(stdin/stdout)。

piped_rdd = rdd.pipe(exe_path)

当我检查返回的PipeLineRDD时,所有行都已转换为字符串

["Row(ID='x123223=', FirstName='L', LastName='S')", "Row(ID='43454".....)]

是否有可能将这些字符串转换回正确的行?

我们可以使用eval()。我测试了下面的代码,它似乎对样本数据有效。

val_ls = [
"Row(ID='x123223=', FirstName='L', LastName='S')", 
"Row(ID='x123224=', FirstName='K', LastName='P')"
]
def evalRow(theRowString):
"""
imports pyspark.sql.Row and uses `eval()` to resolve the Row strings
"""
from pyspark.sql import Row
return eval(theRowString)
spark.sparkContext.parallelize(val_ls).map(lambda k: evalRow(k)).collect()
# [Row(ID='x123223=', FirstName='L', LastName='S'),
# Row(ID='x123224=', FirstName='K', LastName='P')]

我检查了结果的type

set(spark.sparkContext.parallelize(val_ls).map(lambda k: type(evalRow(k))).collect())
# {pyspark.sql.types.Row}

我最初只是导入Row而没有创建函数,并直接在map()中使用eval()

from pyspark.sql import Row
spark.sparkContext.parallelize(val_ls).map(lambda k: eval(k)).collect()

我一直碰到错误NameError: name 'Row' is not defined。我认为这是因为我们没有在工人上导入Row,它不知道如何评估eval("Row(...)")

最新更新