在Scala Spark中作为数据框架读取csv文件时提供模式



我正在尝试将csv文件读取到数据框架中。我知道我的数据框架的模式应该是什么,因为我知道我的csv文件。我也使用spark csv包来读取文件。我尝试像下面这样指定模式。

val pagecount = sqlContext.read.format("csv")
  .option("delimiter"," ").option("quote","")
  .option("schema","project: string ,article: string ,requests: integer ,bytes_served: long")
  .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")

但是当我检查我创建的数据框架的模式时,它似乎已经采用了自己的模式。我做错什么了吗?如何让spark拾起我提到的模式?

> pagecount.printSchema
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
|-- _c3: string (nullable = true)

试试下面的代码,不需要指定模式。当你设置interschema为true时,它应该从你的csv文件中获取它。

val pagecount = sqlContext.read.format("csv")
  .option("delimiter"," ").option("quote","")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")

如果你想手动指定模式,你可以这样做:

import org.apache.spark.sql.types._
val customSchema = StructType(Array(
  StructField("project", StringType, true),
  StructField("article", StringType, true),
  StructField("requests", IntegerType, true),
  StructField("bytes_served", DoubleType, true))
)
val pagecount = sqlContext.read.format("csv")
  .option("delimiter"," ").option("quote","")
  .option("header", "true")
  .schema(customSchema)
  .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")

对于那些有兴趣在Python中这样做的人来说,这里有一个工作版本。

customSchema = StructType([
    StructField("IDGC", StringType(), True),        
    StructField("SEARCHNAME", StringType(), True),
    StructField("PRICE", DoubleType(), True)
])
productDF = spark.read.load('/home/ForTesting/testProduct.csv', format="csv", header="true", sep='|', schema=customSchema)
testProduct.csv
ID|SEARCHNAME|PRICE
6607|EFKTON75LIN|890.88
6612|EFKTON100HEN|55.66

我在分析中使用了Arunakiran Nulu提供的解决方案(请参阅代码)。尽管它能够为列分配正确的类型,但返回的所有值都是null。以前,我尝试过选项.option("inferSchema", "true"),它在数据框中返回正确的值(尽管类型不同)。

val customSchema = StructType(Array(
    StructField("numicu", StringType, true),
    StructField("fecha_solicitud", TimestampType, true),
    StructField("codtecnica", StringType, true),
    StructField("tecnica", StringType, true),
    StructField("finexploracion", TimestampType, true),
    StructField("ultimavalidacioninforme", TimestampType, true),
    StructField("validador", StringType, true)))
val df_explo = spark.read
        .format("csv")
        .option("header", "true")
        .option("delimiter", "t")
        .option("timestampFormat", "yyyy/MM/dd HH:mm:ss") 
        .schema(customSchema)
        .load(filename)
结果

root

|-- numicu: string (nullable = true)
 |-- fecha_solicitud: timestamp (nullable = true)
 |-- codtecnica: string (nullable = true)
 |-- tecnica: string (nullable = true)
 |-- finexploracion: timestamp (nullable = true)
 |-- ultimavalidacioninforme: timestamp (nullable = true)
 |-- validador: string (nullable = true)

,表为:

|numicu|fecha_solicitud|codtecnica|tecnica|finexploracion|ultimavalidacioninforme|validador|
+------+---------------+----------+-------+--------------+-----------------------+---------+
|  null|           null|      null|   null|          null|                   null|     null|
|  null|           null|      null|   null|          null|                   null|     null|
|  null|           null|      null|   null|          null|                   null|     null|
|  null|           null|      null|   null|          null|                   null|     null|

前面的解决方案使用了自定义StructType。

在spark-sql 2.4.5 (scala 2.12.10版本)中,现在可以使用schema函数将模式指定为字符串
import org.apache.spark.sql.SparkSession;

val sparkSession = SparkSession.builder()
            .appName("sample-app")
            .master("local[2]")
            .getOrCreate();
val pageCount = sparkSession.read
  .format("csv")
  .option("delimiter","|")
  .option("quote","")
  .schema("project string ,article string ,requests integer ,bytes_served long")
  .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")

感谢@Nulu的回答,它可以在pyspark上工作,只需进行最小的调整

from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType
customSchema = StructType(Array(
    StructField("project", StringType, true),
    StructField("article", StringType, true),
    StructField("requests", IntegerType, true),
    StructField("bytes_served", DoubleType, true)))
pagecount = sc.read.format("com.databricks.spark.csv")
         .option("delimiter"," ")
         .option("quote","")
         .option("header", "false")
         .schema(customSchema)
         .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")

模式定义为简单字符串

只是为了以防有人对模式定义感兴趣,它是一个简单的字符串,具有日期时间戳

从终端或shell创建数据文件

echo " 
2019-07-02 22:11:11.000999, 01/01/2019, Suresh, abc  
2019-01-02 22:11:11.000001, 01/01/2020, Aadi, xyz 
" > data.csv

将模式定义为String

    user_schema = 'timesta TIMESTAMP,date DATE,first_name STRING , last_name STRING'

读取数据

    df = spark.read.csv(path='data.csv', schema = user_schema, sep=',', dateFormat='MM/dd/yyyy',timestampFormat='yyyy-MM-dd HH:mm:ss.SSSSSS')
    df.show(10, False)
    +-----------------------+----------+----------+---------+
    |timesta                |date      |first_name|last_name|
    +-----------------------+----------+----------+---------+
    |2019-07-02 22:11:11.999|2019-01-01| Suresh   | abc     |
    |2019-01-02 22:11:11.001|2020-01-01| Aadi     | xyz     |
    +-----------------------+----------+----------+---------+

请注意,显式定义模式而不是让spark推断模式也可以提高spark的读性能。

下面是如何使用自定义模式的完整演示:

$> shell代码,

echo "
Slingo, iOS 
Slingo, Android
" > game.csv
Scala代码:

import org.apache.spark.sql.types._
val customSchema = StructType(Array(
  StructField("game_id", StringType, true),
  StructField("os_id", StringType, true)
))
val csv_df = spark.read.format("csv").schema(customSchema).load("game.csv")
csv_df.show 
csv_df.orderBy(asc("game_id"), desc("os_id")).show
csv_df.createOrReplaceTempView("game_view")
val sort_df = sql("select * from game_view order by game_id, os_id desc")
sort_df.show 
// import Library
import java.io.StringReader ;
import au.com.bytecode.opencsv.CSVReader
//filename
var train_csv = "/Path/train.csv";
//read as text file
val train_rdd = sc.textFile(train_csv)   
//use string reader to convert in proper format
var full_train_data  = train_rdd.map{line =>  var csvReader = new CSVReader(new StringReader(line)) ; csvReader.readNext();  }   
//declares  types
type s = String
// declare case class for schema
case class trainSchema (Loan_ID :s ,Gender :s, Married :s, Dependents :s,Education :s,Self_Employed :s,ApplicantIncome :s,CoapplicantIncome :s,
    LoanAmount :s,Loan_Amount_Term :s, Credit_History :s, Property_Area :s,Loan_Status :s)
//create DF RDD with custom schema 
var full_train_data_with_schema = full_train_data.mapPartitionsWithIndex{(idx,itr)=> if (idx==0) itr.drop(1); 
                     itr.toList.map(x=> trainSchema(x(0),x(1),x(2),x(3),x(4),x(5),x(6),x(7),x(8),x(9),x(10),x(11),x(12))).iterator }.toDF

在pyspark 2.4以后,您可以简单地使用header参数来设置正确的头:

data = spark.read.csv('data.csv', header=True)

同样,如果使用scala,您也可以使用header参数

您也可以使用sparkSession和隐式

import sparkSession.implicits._
val pagecount:DataFrame = sparkSession.read
.option("delimiter"," ")
.option("quote","")
.option("inferSchema","true")
.csv("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
.toDF("project","article","requests","bytes_served")

如果你的spark版本是3.0.1,你可以使用下面的Scala脚本:

val df = spark.read.format("csv").option("delimiter",",").option("header",true).load("file:///LOCAL_CSV_FILE_PATH")

但是这样,所有的数据类型都将被设置为String

这是我们可以在加载CSV时将列名传递给数据框的选项之一。

import pandas
    names = ['sepal-length', 'sepal-width', 'petal-length', 'petal-width', 'class']
    dataset = pandas.read_csv("C:/Users/NS00606317/Downloads/Iris.csv", names=names, header=0)
print(dataset.head(10))

输出
    sepal-length  sepal-width  petal-length  petal-width        class
1            5.1          3.5           1.4          0.2  Iris-setosa
2            4.9          3.0           1.4          0.2  Iris-setosa
3            4.7          3.2           1.3          0.2  Iris-setosa
4            4.6          3.1           1.5          0.2  Iris-setosa
5            5.0          3.6           1.4          0.2  Iris-setosa
6            5.4          3.9           1.7          0.4  Iris-setosa
7            4.6          3.4           1.4          0.3  Iris-setosa
8            5.0          3.4           1.5          0.2  Iris-setosa
9            4.4          2.9           1.4          0.2  Iris-setosa
10           4.9          3.1           1.5          0.1  Iris-setosa

我的解决方案是:

import org.apache.spark.sql.types._
  val spark = org.apache.spark.sql.SparkSession.builder.
  master("local[*]").
  appName("Spark CSV Reader").
  getOrCreate()
val movie_rating_schema = StructType(Array(
  StructField("UserID", IntegerType, true),
  StructField("MovieID", IntegerType, true),
  StructField("Rating", DoubleType, true),
  StructField("Timestamp", TimestampType, true)))
val df_ratings: DataFrame = spark.read.format("csv").
  option("header", "true").
  option("mode", "DROPMALFORMED").
  option("delimiter", ",").
  //option("inferSchema", "true").
  option("nullValue", "null").
  schema(movie_rating_schema).
  load(args(0)) //"file:///home/hadoop/spark-workspace/data/ml-20m/ratings.csv"
val movie_avg_scores = df_ratings.rdd.map(_.toString()).
  map(line => {
    // drop "[", "]" and then split the str 
    val fileds = line.substring(1, line.length() - 1).split(",")
    //extract (movie id, average rating)
    (fileds(1).toInt, fileds(2).toDouble)
  }).
  groupByKey().
  map(data => {
    val avg: Double = data._2.sum / data._2.size
    (data._1, avg)
  })

相关内容

  • 没有找到相关文章

最新更新