避免在具有模式的 Spark SQL 中进行双重转换



>我有一个简单的JSON,如下所示,值节点有时会有STRING,有时会有DOUBLE。我想将值视为字符串。但是当 Spark 看到标签是它的两倍,它用 E 转换为不同的格式

输入数据接口

{"key" : "k1", "value": "86093351508521808.0"}
{"key" : "k2", "value": 86093351508521808.0}

火花输出 CSV

k1,86093351508521808.0
k2,8.6093351508521808E16

预期产出

k1,86093351508521808.0
k2,86093351508521808.0

请告知如何实现预期产出。我们永远不会读取标签中的值,因此我们永远不会知道精度和其他细节。

下面是示例代码

public static void main(String[] args) {
    SparkSession sparkSession = SparkSession
        .builder()
        .appName(TestSpark.class.getName())
        .master("local[*]").getOrCreate();
    SparkContext context = sparkSession.sparkContext();
    context.setLogLevel("ERROR");
    SQLContext sqlCtx = sparkSession.sqlContext();
    System.out.println("Spark context established");
    List<StructField> kvFields = new ArrayList<>();
    kvFields.add(DataTypes.createStructField("key", DataTypes.StringType, true));
    kvFields.add(DataTypes.createStructField("value", DataTypes.StringType, true));
    StructType employeeSchema = DataTypes.createStructType(kvFields);
    Dataset<Row> dataset = sparkSession.read()
        .option("inferSchema", false)
        .format("json")
        .schema(employeeSchema)
        .load("D:\dev\workspace\java\simple-kafka\key_value.json");
    dataset.createOrReplaceTempView("sourceView");
    sqlCtx.sql("select * from sourceView  ")
        .write()
        .format("csv")
        .save("D:\dev\workspace\java\simple-kafka\output\" + UUID.randomUUID().toString());
    sparkSession.close();

}

我们可以将该列转换为 DecimalType,如下所示:

scala> import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.DecimalType
scala> spark.read.json(sc.parallelize(Seq("""{"key" : "k1", "value": "86093351508521808.0"}""","""{"key" : "k2", "value": 86093351508521808.0}"""))).select(col("value").cast(DecimalType(28, 1))).show
+-------------------+
|              value|
+-------------------+
|86093351508521808.0|
|86093351508521808.0|
+-------------------+

相关内容

  • 没有找到相关文章

最新更新