Pyspark -Spark簇EC2-无法保存到S3



我已经建立了一个带有主人和2个奴隶的火花群集(我正在使用Spark独立)。该群集在一些示例中效果很好,但我的应用程序不佳。我的应用程序工作流程是,它将读取CSV->与标题 ->转换为JSON->保存到S3一起提取CSV ->提取CSV。这是我的代码:

def upload_func(row):
    f = row.toJSON()
    f.saveAsTextFile("s3n://spark_data/"+ row.name +".json")
    print(f)
    print(row.name)
if __name__ == "__main__":
    spark = SparkSession 
        .builder 
        .appName("Python Spark SQL data source example") 
        .getOrCreate()
    df = spark.read.csv("sample.csv", header=True, mode="DROPMALFORMED")
    df.rdd.map(upload_func)

我还将AWS_Key_IDAWS_Secret_Key导出到EC2环境中。但是,对于上述代码,我的应用程序不起作用。以下是:

  1. JSON文件未保存在S3中,我尝试过几次运行该应用程序,并且还重新加载了S3页面,但没有数据。该应用程序完成了日志中没有任何错误。另外,print(f)print(row.name)在日志中未打印出来。我需要修复什么以获取JSON保存在S3上,无论如何我是否有在日志上打印的用于调试目的?

  2. 当前我需要将CSV文件放在工作节点中,以便应用程序可以读取CSV文件。我如何将文件放在另一个地方,假设主节点以及应用程序运行时,它将CSV文件将CSV文件拆分为所有工作人员节点,以便他们可以按以分布式系统为单位上传?

帮助非常感谢。感谢您提前的帮助。

更新

将Logger进行调试后,我已经确定了映射功能upload_func()未被调用的问题,或者该应用程序无法进入此功能(logger打印在功能调用之前和之后)。如果您知道原因?

,请提供帮助

您需要强制评估地图;火花只能按需执行工作。

df.rdd.map(upload_func).count()应该做

相关内容

  • 没有找到相关文章

最新更新