我使用下面的函数来上传自定义分隔符文件。
from pyspark.sql.types import *
from pyspark.sql import *
def loadFile(path, rowDelimeter, columnDelimeter, firstHeaderColName):
loadedFile = sc.newAPIHadoopFile(path, "org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
"org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text",
conf={"textinputformat.record.delimiter": rowDelimeter})
rddData = loadedFile.map(lambda l:l[1].split(columnDelimeter)).filter(lambda f: f[0] != firstHeaderColName)
return rddData
我调用了下面的函数。
from pyspark.sql.types import *
from pyspark.sql import *
Schema = StructType([
StructField("FlightID", StringType(), True),
StructField("Flightname", StringType(), True),
StructField("FlightNo", StringType(), True),
StructField("Depaturetime", StringType(), True),
StructField("ArrivalTime", StringType(), True),
StructField("FromPlace", StringType(), True),
StructField("ToPlace", StringType(), True),
StructField("Cremember", StringType(), True)
])
Data= loadFile("wasb://Accountnumber@serviceaccount.blob.core.windows.net/Flightdetails.txt",
"rn","#|#","FlightID")
FlightDF = sqlContext.createDataFrame(Data,Schema )
FlightDF.write.saveAsTable("Flightdetail")
源文件大小为2GB,进程继续运行。
我做错了什么?
建议尝试使用较小的数据集或几行。2 GB的数据文件可能有一些数据异常,这可能导致这个问题。