使用java在Apache Spark中多行输入



我已经在这个网站上看过其他类似的问题,但没有得到满意的答案。

我是一个完全的新手Apache spark和hadoop。我的问题是,我有一个输入文件(35GB),其中包含在线购物网站的商品的多行评论。文件中给出的信息如下所示:

productId: C58500585F
product:  Nun Toy
product/price: 5.99
userId: A3NM6WTIAE
profileName: Heather
helpfulness: 0/1
score: 2.0
time: 1624609
summary: not very much fun
text: Bought it for a relative. Was not impressive.

这是一个块的复习。有成千上万个这样的块由空行分隔。这里我需要的是productId、userId和score,所以我已经过滤了JavaRDD,只剩下我需要的行。所以它看起来像这样:

productId: C58500585F
userId: A3NM6WTIAE
score: 2.0
代码:

SparkConf conf = new SparkConf().setAppName("org.spark.program").setMaster("local");
JavaSparkContext context = new JavaSparkContext(conf);
JavaRDD<String> input = context.textFile("path");
JavaRDD<String> requiredLines = input.filter(new Function<String, Boolean>() {
public Boolean call(String s) throws Exception {
if(s.contains("productId") ||  s.contains("UserId") || s.contains("score") ||  s.isEmpty() ) {
        return false;
    }
    return true;
}
});

现在,我需要读取这三行作为一个(键,值)对的一部分,我不知道如何。在两个评论块之间只有一个空行

我已经看了几个网站,但没有找到解决我的问题。有谁能帮我一下吗?非常感谢!如果你需要更多的信息,请告诉我。

继续我之前的评论,这里可以使用textinputformat.record.delimiter。如果唯一的分隔符是空行,则该值应设置为"nn"

考虑以下测试数据:

productId: C58500585F
product:  Nun Toy
product/price: 5.99
userId: A3NM6WTIAE
profileName: Heather
helpfulness: 0/1
score: 2.0
time: 1624609
summary: not very much fun
text: Bought it for a relative. Was not impressive.
productId: ABCDEDFG
product:  Teddy Bear
product/price: 6.50
userId: A3NM6WTIAE
profileName: Heather
helpfulness: 0/1
score: 2.0
time: 1624609
summary: not very much fun
text: Second comment.
productId: 12345689
product:  Hot Wheels
product/price: 12.00
userId: JJ
profileName: JJ
helpfulness: 1/1
score: 4.0
time: 1624609
summary: Summarized
text: Some text

那么代码(在Scala中)看起来像:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
val conf = new Configuration
conf.set("textinputformat.record.delimiter", "nn")
val raw = sc.newAPIHadoopFile("test.txt", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)
val data = raw.map(e => {
  val m = e._2.toString
    .split("n")
    .map(_.split(":", 2))
    .filter(_.size == 2)
    .map(e => (e(0), e(1).trim))
    .toMap
  (m("productId"), m("userId"), m("score").toDouble)
})

输出是:

data.foreach(println)
(C58500585F,A3NM6WTIAE,2.0)
(ABCDEDFG,A3NM6WTIAE,2.0)
(12345689,JJ,4.0)

不确定你到底想要输出什么,所以我把它变成了一个3元素的元组。此外,如果您需要的话,解析逻辑肯定可以更有效,但这应该会给您一些工作。

相关内容

  • 没有找到相关文章

最新更新