我使用PySpark将RDD管道输出到外部进程(stdin/stdout)。
piped_rdd = rdd.pipe(exe_path)
当我检查返回的PipeLineRDD时,所有行都已转换为字符串
["Row(ID='x123223=', FirstName='L', LastName='S')", "Row(ID='43454".....)]
是否有可能将这些字符串转换回正确的行?
我们可以使用eval()
。我测试了下面的代码,它似乎对样本数据有效。
val_ls = [
"Row(ID='x123223=', FirstName='L', LastName='S')",
"Row(ID='x123224=', FirstName='K', LastName='P')"
]
def evalRow(theRowString):
"""
imports pyspark.sql.Row and uses `eval()` to resolve the Row strings
"""
from pyspark.sql import Row
return eval(theRowString)
spark.sparkContext.parallelize(val_ls).map(lambda k: evalRow(k)).collect()
# [Row(ID='x123223=', FirstName='L', LastName='S'),
# Row(ID='x123224=', FirstName='K', LastName='P')]
我检查了结果的type
set(spark.sparkContext.parallelize(val_ls).map(lambda k: type(evalRow(k))).collect())
# {pyspark.sql.types.Row}
我最初只是导入Row
而没有创建函数,并直接在map()
中使用eval()
—
from pyspark.sql import Row
spark.sparkContext.parallelize(val_ls).map(lambda k: eval(k)).collect()
我一直碰到错误NameError: name 'Row' is not defined
。我认为这是因为我们没有在工人上导入Row
,它不知道如何评估eval("Row(...)")