格式(删除类/帕伦斯)火花CSV SaveAsTextFile输出



我正在尝试从saveastextfile保存的CSV数据中剥离包装类或数组文本,而无需执行非SPARK后处理步骤。

我在大文件中有一些TSV数据,我将其馈送到RDD。

 val testRdd = sc.textFile(_input).filter(!_.startsWith("unique_transaction_id")).map(x => x.toLowerCase).map(x => x.split('t')).map(x => Test(x(0),x(1)))
testRdd.saveAsTextFile("test")

这保存了由类名称包裹的数据:

head -n 1 part-00000
Test("1969720fb3100608b38297aad8b3be93","active")

我还尝试将其摄入未命名的类(?)而不是案例类。

val testRdd = sc.textFile(_input).filter(!_.startsWith("unique_transaction_id")).map(x => x.toLowerCase).map(x => x.split('t')).map(x => (x(0),x(1)))

testrdd.saveastextfile(" test2")

这产生

("1969720fb3100608b38297aad8b3be93","active")

仍然需要后处理才能删除包装。

为了剥离包装字符,我尝试了FlatMap(),但RDD显然不是正确的类型:

testRdd.flatMap(identity).saveAsTextFile("test3")
<console>:17: error: type mismatch;
 found   : ((String, String)) => (String, String)
 required: ((String, String)) => TraversableOnce[?]
              testRdd.flatMap(identity).saveAsTextFile("test3")

所以...我需要将RDD转换为其他类型的RDD,还是其他方法可以将RDD保存为CSV,以使包装文本被剥离?

谢谢!

val testRdd = sc.textFile(_input).filter(!_.startsWith("unique_transaction_id")).map(x => x.toLowerCase).map(x => x.split('t')).map(x => x(0)+","+x(1))

这会将输出写为CSV

您可以尝试以下内容:

val testRdd = sc.textFile(_input).filter(!_.startsWith("unique_transaction_id"))
                                 .map(x => x.toLowerCase.split('t'))
                                 .map(x => x(0)+","+x(1))

我们所听到的是在过滤您的标题后,您可以在同一地图中的平台节省一些不必要的额外映射。

这将创建一个可以保存为CSV格式的RDD [String]。

PS:

  • 保存的RDD输出的扩展不是CSV,而是格式是!

  • 这不是最佳的解决方案,但它将为您完成工作!

您可能会查看Spark CSV库。

val logfile ="/input.csv"

val conf = new sparkconf()。set(" spark.driver.allowmultiplecontexts"," true")

val sc = new SparkContext(master =" local",appName =" mi app",conf)

val logdata = sc.textfile(logfile,2).cache()

val lower = logdata.map(line => line.tolowercase)

最新更新