>我正在使用Spark SQL上下文读取带有CSV文件的文件。
法典:
m.put("path", CSV_DIRECTORY+file.getOriginalFilename());
m.put("inferSchema", "true"); // Automatically infer data types else string by default
m.put("header", "true"); // Use first line of all files as header
m.put("delimiter", ";");
DataFrame df = sqlContext.load("com.databricks.spark.csv",m);
df.printSchema();
使用df.printSchema()
获取列名和数据类型
操作 :
|--id : integer (nullable = true)
|-- ApplicationNo: string (nullable = true)
|-- Applidate: timestamp(nullable = true)
语句 printSchema 的返回类型是什么。如何将输出转换为JSON格式,如何将数据帧转换为JSON?
期望的 O/P:
{"column":"id","datatype":"integer"}
DataType 有一个 json() 方法和一个 fromJson() 方法,可用于序列化/反序列化模式。
val df = sqlContext.read().....load()
val jsonString:String = df.schema.json()
val schema:StructType = DataType.fromJson(jsonString).asInstanceOf[StructType]
Spark SQL 方式,
df.createOrReplaceTempView("<table_name>")
spark.sql("SELECT COLLECT_SET(STRUCT(<field_name>)) AS `` FROM <table_name> LIMIT 1").coalesce(1).write.format("org.apache.spark.sql.json").mode("overwrite").save(<Blob Path1/ ADLS Path1>)
输出将是这样的,
{"":[{<field_name>:<field_value1>},{<field_name>:<field_value2>}]}
这里可以通过以下 3 行来避免标题(假设数据中没有 Tilda),
val jsonToCsvDF=spark.read.format("com.databricks.spark.csv").option("delimiter", "~").load(<Blob Path1/ ADLS Path1>)
jsonToCsvDF.createOrReplaceTempView("json_to_csv")
spark.sql("SELECT SUBSTR(`_c0`,5,length(`_c0`)-5) FROM json_to_csv").coalesce(1).write.option("header",false).mode("overwrite").text(<Blob Path2/ ADLS Path2>)