R - 通过推断模式读取 CSV 文件时的 Sparklyr 异常:双精度



我正在尝试使用spark_read_csv函数将csv读取到Spark中。我在推断架构时遇到异常,即我在设置infer_schema=TRUE时出现异常。

spark_read_csv(sc,"myDf",DatasetUrl)

我收到以下异常:

错误:org.apache.spark.Spark异常:作业由于阶段失败而中止:阶段 90.0 中的任务 0 失败 1 次,最近失败:阶段 90.0 中丢失的任务 0.0(TID 151,本地主机(:java.text.ParseException:不可分析的数字:"cr1_fd_dttm" at java.text.NumberFormat.parse(NumberFormat.java:385( at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$4.apply$mcD$sp(CSVInferSchema.scala:259(

但是,当我尝试通过设置infer_schema=FALSE时,正如预期的那样,所有内容都读取为chr类型。

以下是数据在cr1_fd_dttm列中的外观:

      cr1_fd_dttm
            <chr>
1             0.0
2   1.45679112E12
3   1.45679166E12
4   1.45679154E12
5   1.45679274E12
6             0.0
7             0.0
8             0.0
9             0.0
10  1.45679118E12

有人可以帮助我吗?

谢谢

我只是读取文件而不立即将其放入内存,强制字段数字,然后将这些结果加载到内存中。因此,关键是将memory设置为 FALSE,infer_schema设置为 FALSE,传递列列表,强制使用compute()将结果保存到 Spark 内存中。 这是一个冗长但有效的示例:

mapped_flights <- spark_read_csv(sc, "mapped_flights", 
                      path =  "s3a://flights-data/full", 
                      memory = FALSE, 
                      infer_schema = FALSE,
                      columns = list(
                        Year = "character",
                        Month = "character",
                        DayofMonth = "character",
                        DayOfWeek = "character",
                        DepTime = "character",
                        CRSDepTime = "character",
                        ArrTime = "character",
                        CRSArrTime = "character",
                        UniqueCarrier = "character",
                        FlightNum = "character",
                        TailNum = "character",
                        ActualElapsedTime = "character",
                        CRSElapsedTime = "character",
                        AirTime = "character",
                        ArrDelay = "character",
                        DepDelay = "character",
                        Origin = "character",
                        Dest = "character",
                        Distance = "character",
                        TaxiIn = "character",
                        TaxiOut = "character",
                        Cancelled = "character",
                        CancellationCode = "character",
                        Diverted = "character",
                        CarrierDelay = "character",
                        WeatherDelay = "character",
                        NASDelay = "character",
                        SecurityDelay = "character",
                        LateAircraftDelay = "character")
                      )

flights <- mapped_flights %>%   mutate(
Year = as.integer(Year),
Month = as.integer(Month),
DayofMonth = as.integer(DayofMonth),
DayOfWeek = as.integer(DayOfWeek),
DepTime = as.integer(DepTime),
CRSDepTime = as.integer(CRSDepTime),
CRSArrTime = as.integer(CRSArrTime),
ArrTime = as.integer(ArrTime),
ActualElapsedTime = as.integer(ActualElapsedTime),
CRSElapsedTime = as.integer(CRSElapsedTime),
AirTime = as.integer(AirTime),
ArrDelay = as.double(ArrDelay),
DepDelay = as.double(DepDelay),
Distance = as.integer(Distance),
TaxiIn = as.integer(TaxiIn),
TaxiOut = as.integer(TaxiOut),
Cancelled = as.integer(Cancelled),
Diverted = as.integer(Diverted),
CarrierDelay = as.integer(CarrierDelay),
WeatherDelay = as.integer(WeatherDelay),
NASDelay = as.integer(NASDelay),
SecurityDelay = as.integer(SecurityDelay),
LateAircraftDelay = as.integer(LateAircraftDelay)) %>%   compute("flights")

最新更新