使用pyspark函数上传自定义分隔符文件不工作



我使用下面的函数来上传自定义分隔符文件。

from pyspark.sql.types import *
from pyspark.sql import *
 
def loadFile(path, rowDelimeter, columnDelimeter, firstHeaderColName):
 
    loadedFile = sc.newAPIHadoopFile(path, "org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
                                      "org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text",
                                      conf={"textinputformat.record.delimiter": rowDelimeter})
 
 
    rddData = loadedFile.map(lambda l:l[1].split(columnDelimeter)).filter(lambda f: f[0] != firstHeaderColName)
        
    return rddData

我调用了下面的函数。

from pyspark.sql.types import *
from pyspark.sql import *
Schema = StructType([
        StructField("FlightID", StringType(), True),
	StructField("Flightname", StringType(), True),
	StructField("FlightNo", StringType(), True),
	StructField("Depaturetime", StringType(), True),
	StructField("ArrivalTime", StringType(), True),
	StructField("FromPlace", StringType(), True),
	StructField("ToPlace", StringType(), True),
	StructField("Cremember", StringType(), True)
	
	
        
        
])
Data= loadFile("wasb://Accountnumber@serviceaccount.blob.core.windows.net/Flightdetails.txt",
                     "rn","#|#","FlightID")
                     
FlightDF = sqlContext.createDataFrame(Data,Schema )
FlightDF.write.saveAsTable("Flightdetail")

源文件大小为2GB,进程继续运行。

我做错了什么?

建议尝试使用较小的数据集或几行。2 GB的数据文件可能有一些数据异常,这可能导致这个问题。

最新更新