我是Apache Spark的新手,正在尝试将SchemaRDD与管道分隔的文本文件一起使用。我使用Scala 10在Mac上独立安装了Spark 1.5.2。我有一个CSV文件,其中包含以下代表性数据,我正试图根据记录的第一个值(列)将以下数据拆分为4个不同的文件。如果我能得到任何帮助,我将不胜感激。
1|1.8|20140801T081137|115810740
2|20140714T060000|335|22159892|3657|0.00|||181
2|20140714T061500|335|22159892|3657|0.00|||157
2|20140714T063000|335|22159892|3657|0.00|||156
2|20140714T064500|335|22159892|3657|0.00|||66
2|20140714T070000|335|22159892|3657|0.01|||633
2|20140714T071500|335|22159892|3657|0.01|||1087
3|34|Starz
3|35|VH1
3|36|CSPAN: Cable Satellite Public Affairs Network
3|37|Encore
3|278|CMT: Country Music Television
3|281|Telehit
4|625363|1852400|Matlock|9212|The Divorce
4|625719|1852400|Matlock|16|The Rat Pack
4|625849|1846952|Smallville|43|Calling
注意:您的csv文件每行中的字段数量不相同-这无法按原样解析为DataFrame。(SchemaRDD已重命名为DataFrame。)如果csv文件格式正确,您可以执行以下操作:
使用--packages.com.databricks:spark-csv2.10:1.3.0启动spark-shell或spark-submit,以便轻松解析csv文件(请参阅此处)。在Scala中,您的代码将是,假设您的csv文件有一个标题——如果是,那么引用列会更容易:
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").option("delimiter", '|').load("/path/to/file.csv")
// assume 1st column has name col1
val df1 = df.filter( df("col1") === 1) // 1st DataFrame
val df2 = df.filter( df("col1") === 2) // 2nd DataFrame etc...
由于您的文件格式不好,您将不得不以不同的方式解析每一行,因此,例如,请执行以下操作:
val lines = sc.textFile("/path/to/file.csv")
case class RowRecord1( col1:Int, col2:Double, col3:String, col4:Int)
def parseRowRecord1( arr:Array[String]) = RowRecord1( arr(0).toInt, arr(1).toDouble, arr(2), arr(3).toInt)
case class RowRecord2( col1:Int, col2:String, col3:Int, col4:Int, col5:Int, col6:Double, col7:Int)
def parseRowRecord2( arr:Array[String]) = RowRecord2( arr(0).toInt, arr(1), arr(2).toInt, arr(3).toInt, arr(4).toInt, arr(5).toDouble, arr(8).toInt)
val df1 = lines.filter(_.startsWith("1")).map( _.split('|')).map( arr => parseRowRecord1( arr )).toDF
val df2 = lines.filter(_.startsWith("2")).map( _.split('|')).map( arr => parseRowRecord2( arr )).toDF
在PySpark中,命令是:
df = spark.read.csv("filepath", sep="|")