在pyspark地图逻辑中使用SparkSQL无法正常工作



我有许多小文件。我想将它们加载到RDD中。然后映射它们以并行执行这些文件上的算法。该算法将需要从HDFS/Hive-tables获取数据。当我使用SparkSQL获取数据时,我会收到以下错误:

泡菜。picklingError:无法序列化对象:异常: 似乎您正在尝试从一个 广播变量,动作或转换。SparkContext只能 在驱动程序上使用,而不是在工人上运行的代码。更多 信息,请参见Spark-5063。

SparkSQL使用SQLContext,它是SparkContext上的包装器。这是否意味着我不能在执行工人执行的代码中使用SparkSQL?但是那将是太限制的。

有人可以分享有关如何在Pyspark中编码我的逻辑的知识吗?

这是我正在使用的示例pyspark代码:

def apply_algorithm(filename):
    /* SparkSQL logic goes here */ 
    /* some more logic */
    return someResult

def main(argv):
    print "Entered main method"
    input_dir = sys.argv[1]
    output_dir = sys.argv[2]
    fileNameContentMapRDD = sc.wholeTextFiles(input_dir)
    print "fileNameContentMapRDD = " , fileNameContentMapRDD.collect()
    resultRDD = fileNameContentMapRDD.map(lambda x : apply_algorithm(x[0]))
    print resultRDD.collect()
    print "end of main."

这是否意味着我无法在执行工人执行的代码中使用SparkSQL?

是的,这是准确的。您可以从并行化上下文中使用RDDsDataFrames

相关内容

  • 没有找到相关文章

最新更新