我已经建立了一个带有主人和2个奴隶的火花群集(我正在使用Spark独立)。该群集在一些示例中效果很好,但我的应用程序不佳。我的应用程序工作流程是,它将读取CSV->与标题 ->转换为JSON->保存到S3一起提取CSV ->提取CSV。这是我的代码:
def upload_func(row):
f = row.toJSON()
f.saveAsTextFile("s3n://spark_data/"+ row.name +".json")
print(f)
print(row.name)
if __name__ == "__main__":
spark = SparkSession
.builder
.appName("Python Spark SQL data source example")
.getOrCreate()
df = spark.read.csv("sample.csv", header=True, mode="DROPMALFORMED")
df.rdd.map(upload_func)
我还将AWS_Key_ID
和AWS_Secret_Key
导出到EC2环境中。但是,对于上述代码,我的应用程序不起作用。以下是:
JSON文件未保存在S3中,我尝试过几次运行该应用程序,并且还重新加载了S3页面,但没有数据。该应用程序完成了日志中没有任何错误。另外,
print(f)
和print(row.name)
在日志中未打印出来。我需要修复什么以获取JSON保存在S3上,无论如何我是否有在日志上打印的用于调试目的?当前我需要将CSV文件放在工作节点中,以便应用程序可以读取CSV文件。我如何将文件放在另一个地方,假设主节点以及应用程序运行时,它将CSV文件将CSV文件拆分为所有工作人员节点,以便他们可以按以分布式系统为单位上传?
帮助非常感谢。感谢您提前的帮助。
更新
将Logger进行调试后,我已经确定了映射功能upload_func()
未被调用的问题,或者该应用程序无法进入此功能(logger打印在功能调用之前和之后)。如果您知道原因?
您需要强制评估地图;火花只能按需执行工作。
df.rdd.map(upload_func).count()
应该做