spark.read.使用自定义架构时不读取所有Excel行



我正试图从"excel"文件中读取Spark DataFrame。我使用了crealytics依赖项。

在没有任何预定义模式的情况下,所有行都被正确读取,但仅作为字符串类型的列。

为了防止这种情况,我使用了自己的模式(我提到某些列是Integer类型(,但在这种情况下,当读取文件时,大多数行都会被删除

build.sbt:中使用的库依赖项

"com.crealytics" %% "spark-excel" % "0.11.1",
Scala version - 2.11.8 
Spark version - 2.3.2
val inputDF = sparkSession.read.excel(useHeader = true).load(inputLocation(0))

上面读取了所有数据——大约25000行。

但是,

val inputWithSchemaDF: DataFrame = sparkSession.read
.format("com.crealytics.spark.excel")
.option("useHeader" , "false")
.option("inferSchema", "false")
.option("addColorColumns", "true")
.option("treatEmptyValuesAsNulls" , "false")
.option("keepUndefinedRows", "true")
.option("maxRowsInMey", 2000)
.schema(templateSchema)
.load(inputLocation)

这只给了我450行。有办法防止这种情况发生吗?提前感谢!(编辑(

到目前为止,我还没有找到解决这个问题的方法,但我尝试通过手动键入casting以不同的方式解决它。为了在代码行数方面做得更好,我使用了for循环。我的解决方案如下:

步骤1:创建我自己的">StructType"类型的架构:

val requiredSchema = new StructType() 
.add("ID", IntegerType, true)
.add("Vendor", StringType, true)
.add("Brand", StringType, true) 
.add("Product Name", StringType, true)
.add("Net Quantity", StringType, true)

步骤2:从excel文件中读取数据帧(没有自定义架构(后,键入强制转换数据帧(而不是在读取数据时使用架构(:

def convertInputToDesiredSchema(inputDF: DataFrame, requiredSchema: StructType)(implicit sparkSession: SparkSession) : DataFrame = 
{   
var schemaDf: DataFrame = inputDF 

for(i <- inputDF.columns.indices)   
{     
if(inputDF.schema(i).dataType.typeName != requiredSchema(i).dataType.typeName)     
{         
schemaDf = schemaDf.withColumn(schemaDf.columns(i), col(schemaDf.columns(i)).cast(requiredSchema.apply(i).dataType))       
}
} 
schemaDf 
}

这可能不是一个有效的解决方案,但比键入太多行代码来对多列进行类型转换要好。

仍在寻找我最初问题的解决方案

此解决方案只是为了防止有人想要尝试并急需快速修复

这里有一个解决方法,使用PySpark,使用由";字段名";以及";dataType":

# 1st load the dataframe with StringType for all columns
from pyspark.sql.types import *

input_df = spark.read.format("com.crealytics.spark.excel") 
.option("header", isHeaderOn) 
.option("treatEmptyValuesAsNulls", "true") 
.option("dataAddress", xlsxAddress1) 
.option("setErrorCellsToFallbackValues", "true") 
.option("ignoreLeadingWhiteSpace", "true") 
.option("ignoreTrailingWhiteSpace", "true") 
.load(inputfile)
# 2nd Modify the datatypes within the dataframe using a file containing column names and the expected data type.
dtypes = pd.read_csv("/dbfs/mnt/schema/{}".format(file_schema_location), header=None).to_records(index=False).tolist()
fields = [StructField(dtype[0], globals()[f'{dtype[1]}']()) for dtype in dtypes]
schema = StructType(fields)
for dt in dtypes:
colname =dt[0]
coltype = dt[1].replace("Type","")
input_df = input_df.withColumn(colname, col(colname).cast(coltype))

最新更新