我正在尝试从saveastextfile保存的CSV数据中剥离包装类或数组文本,而无需执行非SPARK后处理步骤。
我在大文件中有一些TSV数据,我将其馈送到RDD。
val testRdd = sc.textFile(_input).filter(!_.startsWith("unique_transaction_id")).map(x => x.toLowerCase).map(x => x.split('t')).map(x => Test(x(0),x(1)))
testRdd.saveAsTextFile("test")
这保存了由类名称包裹的数据:
head -n 1 part-00000
Test("1969720fb3100608b38297aad8b3be93","active")
我还尝试将其摄入未命名的类(?)而不是案例类。
val testRdd = sc.textFile(_input).filter(!_.startsWith("unique_transaction_id")).map(x => x.toLowerCase).map(x => x.split('t')).map(x => (x(0),x(1)))
testrdd.saveastextfile(" test2")
这产生
("1969720fb3100608b38297aad8b3be93","active")
仍然需要后处理才能删除包装。
为了剥离包装字符,我尝试了FlatMap(),但RDD显然不是正确的类型:
testRdd.flatMap(identity).saveAsTextFile("test3")
<console>:17: error: type mismatch;
found : ((String, String)) => (String, String)
required: ((String, String)) => TraversableOnce[?]
testRdd.flatMap(identity).saveAsTextFile("test3")
所以...我需要将RDD转换为其他类型的RDD,还是其他方法可以将RDD保存为CSV,以使包装文本被剥离?
谢谢!
val testRdd = sc.textFile(_input).filter(!_.startsWith("unique_transaction_id")).map(x => x.toLowerCase).map(x => x.split('t')).map(x => x(0)+","+x(1))
这会将输出写为CSV
您可以尝试以下内容:
val testRdd = sc.textFile(_input).filter(!_.startsWith("unique_transaction_id"))
.map(x => x.toLowerCase.split('t'))
.map(x => x(0)+","+x(1))
我们所听到的是在过滤您的标题后,您可以在同一地图中的平台节省一些不必要的额外映射。
。这将创建一个可以保存为CSV格式的RDD [String]。
PS:
保存的RDD输出的扩展不是CSV,而是格式是!
这不是最佳的解决方案,但它将为您完成工作!
您可能会查看Spark CSV库。
val logfile ="/input.csv"
val conf = new sparkconf()。set(" spark.driver.allowmultiplecontexts"," true")
val sc = new SparkContext(master =" local",appName =" mi app",conf)
val logdata = sc.textfile(logfile,2).cache()
val lower = logdata.map(line => line.tolowercase)