i有一个我试图使用Spark CSV软件包加载的CSV文件,并且由于其中很少的字段在其中包含n
,因此它无法正确加载数据。以下两个行
"XYZ", "Test Data", "TestNewnline", "OtherData"
"XYZ", "Test Data", "blablablabla
nblablablablablalbal", "OtherData"
我正在使用以下代码,该代码很简单,我使用的是parserLib
作为univocity
,如Internet中所述,它解决了多个Newline问题,但对我来说似乎并非如此。
SQLContext sqlContext = new SQLContext(sc);
DataFrame df = sqlContext.read()
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.option("parserLib","univocity")
.load("data.csv");
如何在字段内替换新线,从报价开始。有什么简单的方法吗?
SPARK 2.2用户可以使用一个选项,以说明CSV文件中的换行符。它最初被讨论为称为wholeFile
,但在发布之前被更名为multiLine
。
以下是将CSV加载到具有该选项的DataFrame中的一个示例:
var webtrends_data = (sparkSession.read
.option("header", "true")
.option("inferSchema", "true")
.option("multiLine", true)
.option("delimiter", ",")
.format("csv")
.load("hdfs://hadoop-master:9000/datasource/myfile.csv"))
根据spark-14194(以重复的重复解决(字段不支持新线字符。
。我建议通过
wholeFile
选项解决此问题,并且似乎合并了。我正在解决此问题,因为那个具有Pr。
但是,这是Spark 2.0,您使用spark-csv
模块。
在引用的Spark-19610中,它是用拉动请求固定的:
hmm,我理解了这一点的动机,尽管我对CSV的理解通常避免在现场中具有新线,或者某些实现将需要使用Newline
围绕现场价值引号
换句话说,在Spark 2.x中使用wholeFile
选项(如CSVDatasource所示(。
关于Spark-CSV,此评论可能有帮助(突出显示我的(:
但是,有很多类似的jiras抱怨这一点,原始的CSV数据源试图支持这一点,尽管这是错误地实现的。这至少将其与JSON ONE匹配,最好提供一种处理此类CSV文件的方法。实际上,当前实现需要引号 :)。(告诉R实际上也支持R(。
在Spark-CSV的功能中,您可以找到以下内容:
该软件包还支持保存简单(非嵌套(数据框架。编写文件时,API接受多个选项:
QUOTE :默认情况下,引用字符为
编写的"
,但可以设置为任何字符。这是根据quoteMode
。quotemode :何时Quote字段(all,minimal(默认(,non_numeric,none(,请参见引用模式
升级到火花2.x。Newline实际上是ASCII 13和10代表的CRLF,但是Backslash和" N"是不同的ASCII,在编程上解释和编写。Spark 2.x将正确读取..我尝试了它..S.B。
val conf = new SparkConf().setAppName("HelloSpark").setMaster("local[2]")
val sc = SparkSession.builder().master("local").getOrCreate()
val df = sc.read.csv("src/main/resources/data.csv")
df.foreach(row => println(row.mkString(", ")))
如果您无法升级,请用Regex在RDD上清理 n。由于它是正则是$,因此不会删除线路的末端。S.B.
val conf = new SparkConf().setAppName("HelloSpark").setMaster("local")
val sc = new SparkContext(conf)
val rdd1 = sc.textFile("src/main/resources/data.csv")
val rdd2 = rdd1.map(row => row.replace("\n", ""))
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val df = rdd2.toDF()
df.foreach(row => println(row.mkString(", ")))