我有一个类似的CSV:
COL,VAL
TEST,100000000.12345679
TEST2,200000000.1234
TEST3,9999.1234679123
我想加载它,将列VAL
作为数字类型(由于项目的其他要求(,然后按照下面的结构将它持久化回另一个CSV:
+-----+------------------+
| COL| VAL|
+-----+------------------+
| TEST|100000000.12345679|
|TEST2| 200000000.1234|
|TEST3| 9999.1234679123|
+-----+------------------+
我面临的问题是,每当我加载它时,数字都会变成科学的符号,我无法在不通知precision
和scale
我的数据的情况下将其保存回来(我想使用文件中已经存在的数据,无论它是什么——我无法推断它(。以下是我尝试过的:
用DoubleType()
加载它,它给了我科学的符号:
schema = StructType([
StructField('COL', StringType()),
StructField('VAL', DoubleType())
])
csv_file = "Downloads/test.csv"
df2 = (spark.read.format("csv")
.option("sep",",")
.option("header", "true")
.schema(schema)
.load(csv_file))
df2.show()
+-----+--------------------+
| COL| VAL|
+-----+--------------------+
| TEST|1.0000000012345679E8|
|TEST2| 2.000000001234E8|
|TEST3| 9999.1234679123|
+-----+--------------------+
用DecimalType()
加载它,我需要指定precision
和scale
,否则,我会丢失点后的小数。然而,指定它时,除了没有得到正确值的风险(因为我的数据可能会四舍五入(外,我在点后得到零:例如,使用:StructField('VAL', DecimalType(38, 18))
我得到:
[Row(COL='TEST', VAL=Decimal('100000000.123456790000000000')),
Row(COL='TEST2', VAL=Decimal('200000000.123400000000000000')),
Row(COL='TEST3', VAL=Decimal('9999.123467912300000000'))]
请注意,在这种情况下,我在右侧有零,我不希望在新文件中出现这些零。
我发现解决它的唯一方法是使用UDF
,首先使用float()
删除科学符号,然后将其转换为字符串,以确保它将按照我的意愿持久化:
to_decimal = udf(lambda n: str(float(n)))
df2 = df2.select("*", to_decimal("VAL").alias("VAL2"))
df2 = df2.select(["COL", "VAL2"]).withColumnRenamed("VAL2", "VAL")
df2.show()
display(df2.schema)
+-----+------------------+
| COL| VAL|
+-----+------------------+
| TEST|100000000.12345679|
|TEST2| 200000000.1234|
|TEST3| 9999.1234679123|
+-----+------------------+
StructType(List(StructField(COL,StringType,true),StructField(VAL,StringType,true)))
有什么方法可以在不使用UDF
技巧的情况下达到同样的效果?
谢谢!
我发现解决它的最佳方法如下。它仍在使用UDF
,但现在,没有了字符串的变通方法来避免科学记数法。我还不会给出正确的答案,因为我仍然希望有人能给出一个没有UDF的解决方案(或者很好地解释为什么没有UDF
是不可能的(。
- CSV:
$ cat /Users/bambrozi/Downloads/testf.csv
COL,VAL
TEST,100000000.12345679
TEST2,200000000.1234
TEST3,9999.1234679123
TEST4,123456789.01234567
- 应用默认PySpark
DecimalType
精度和比例加载CSV:
schema = StructType([
StructField('COL', StringType()),
StructField('VAL', DecimalType(38, 18))
])
csv_file = "Downloads/testf.csv"
df2 = (spark.read.format("csv")
.option("sep",",")
.option("header", "true")
.schema(schema)
.load(csv_file))
df2.show(truncate=False)
输出:
+-----+----------------------------+
|COL |VAL |
+-----+----------------------------+
|TEST |100000000.123456790000000000|
|TEST2|200000000.123400000000000000|
|TEST3|9999.123467912300000000 |
|TEST4|123456789.012345670000000000|
+-----+----------------------------+
- 当您准备好报告(打印或保存在新文件中(时,您会将格式应用于尾随零:
import decimal
import pyspark.sql.functions as F
normalize_decimals = F.udf(lambda dec: dec.normalize())
(df2
.withColumn('VAL', normalize_decimals(F.col('VAL')))
.show(truncate=False))
输出:
+-----+------------------+
|COL |VAL |
+-----+------------------+
|TEST |100000000.12345679|
|TEST2|200000000.1234 |
|TEST3|9999.1234679123 |
|TEST4|123456789.01234567|
+-----+------------------+
您可以使用spark对sql查询执行此操作:
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
val sparkConf: SparkConf = new SparkConf(true)
.setAppName(this.getClass.getName)
.setMaster("local[*]")
implicit val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val df = spark.read.option("header", "true").format("csv").load(csv_file)
df.createOrReplaceTempView("table")
val query = "Select cast(VAL as BigDecimal) as VAL, COL from table"
val result = spark.sql(query)
result.show()
result.coalesce(1).write.option("header", "true").mode("overwrite").csv(outputPath + table)