我是非常新的Apache Spark。我正在尝试将CSV文件加载到Spark RDD和DataFrames中。
我使用RDD来操纵数据框架上的SQL操作的数据和数据框。
将RDD转换为Spark DataFrame时,我遇到了一个问题。问题声明在下面给出。
# to load data
dataRDD = sc.textFile(trackfilepath)
# To use it as a csv
dataRDD = testData.mapPartitions(lambda x: csv.reader(x))
# To load into data frame and capture the schema
dataDF = sqlContext.read.load(trackfilepath,
format='com.databricks.spark.csv',
header='true',
inferSchema='true')
schema = dataDF.schema
数据看起来像
print (dataRDD.take(3))
[['Name', 'f1', 'f2', 'f3', 'f4'], ['Joe', '5', '7', '8', '3'], ['Jill', '3', '2', '2', '23']]
print (dataDF.take(3))
[Row(_c0='Name', _c1='f1', _c2='f2', _c3='f3', _c4='f4'), Row(_c0='Joe', _c1='5', _c2='7', _c3='8', _c4='3'), Row(_c0='Jill', _c1='3', _c2='2', _c3='2', _c4='23')]
print schema
StructType(List(StructField(Name,StringType,true),StructField(f1,IntegerType,true),StructField(f2,IntegerType,true),StructField(f3,IntegerType,true),StructField(f4,IntegerType,true)))
数据操纵
def splitWords(line):
return ['Jillwa' if item=='Jill' else item for item in line]
dataCleanRDD = dataRDD.map(splitWords)
问题:
现在,我试图使用以下代码和架构将操纵的RDD存储到数据框中。
dataCleanDF = sqlContext.createDataFrame(dataCleanRDD, schema=schema)
这给我以下错误:
TypeError: IntegerType can not accept object 'f1' in type <class 'str'>
错误是由于RDD和架构中值的数据类型中的不匹配。RDD将所有内容都视为字符串,而模式为Field1 Field2等整数,依此类推。这是一个虚拟数据集,我的真实数据集由200列和100,000行组成。因此,我很难手动将RDD值更改为整数。
我想知道是否有一种方法可以将模式迫使RDD值。任何帮助将不胜感激。
如果您想使用模式读取CSV,我建议做类似:
的事情df = sqlContext.read.format("com.databricks.spark.csv")
.schema(dataSchema)
.option("header", "false")
.option("delimiter", ",")
.option("inferSchema", "true")
.option("treatEmptyValuesAsNulls", "true")
.option("nullValue", "null")
.load("data.csv")
因此,您将拥有带有模式的数据,并且可以在其中操作,而不是将其与列一起使用,以便在其内部使用UDF,因此您始终随身列名。
另外,如果您有更大的数据集,请首先将其保存为镶木quet或兽人格式,然后再次阅读以执行操作,这将节省很多错误,并且性能将很高。