从字符串到 Tuple2<String, Long> in Spark + Java 的映射



我正在学习如何使用Spark,用Java进行编码(请不要使用Scala代码(。我正在尝试实现Spark的一个非常简单的helloworld示例,一个单词计数。

我借用了Spark文档中的代码快速入门:

/* SimpleApp.java */
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
public class SimpleApp {
public static void main(String[] args) {
String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
Dataset<String> logData = spark.read().textFile(logFile).cache();
long numAs = logData.filter(s -> s.contains("a")).count();
long numBs = logData.filter(s -> s.contains("b")).count();
System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
spark.stop();
}
}

一切都很好,现在我想用flatMapmap替换filter。到目前为止,我已经拿到了flatMap

logData.flatMap((FlatMapFunction<String, String>) l -> {
return Arrays.asList(l.split(" ")).iterator();
}, Encoders.STRING());

现在,我想将每个单词映射到一个Tuple2(word, 1),然后按键对它们进行分组。但问题是我找不到如何从String(String, Long)。大多数文献都在谈论mapToPair,而Dataset却没有这样的方法!

有人能帮我把String映射到Tuple2<String, Long>吗?顺便说一句,我甚至不确定我是在找Tuple2还是其他类。

[更新]

根据@mangusta提供的建议,我尝试了这个:

logData.flatMap((FlatMapFunction<String, String>) l -> {
return Arrays.asList(l.split(" ")).iterator();
}, Encoders.STRING())
.map(new Function<String, Tuple2<String, Long>>() {
public Tuple2<String, Long> call(String str) {
return new Tuple2<String, Long>(str, 1L);
}
})
.count()

面对这个编译错误:

Error:(108, 17) java: no suitable method found for map(<anonymous org.apache.spark.api.java.function.Function<java.lang.String,scala.Tuple2<java.lang.String,java.lang.Long>>>)
method org.apache.spark.sql.Dataset.<U>map(scala.Function1<java.lang.String,U>,org.apache.spark.sql.Encoder<U>) is not applicable
(cannot infer type-variable(s) U
(actual and formal argument lists differ in length))
method org.apache.spark.sql.Dataset.<U>map(org.apache.spark.api.java.function.MapFunction<java.lang.String,U>,org.apache.spark.sql.Encoder<U>) is not applicable
(cannot infer type-variable(s) U
(actual and formal argument lists differ in length))

似乎map函数接受了两个参数。我不确定应该传递什么作为第二个参数。

我不确定错误的原因,但你可以试试这个代码

final String sparkHome = "/usr/local/Cellar/apache-spark/2.3.2";
SparkConf conf = new SparkConf()
.setMaster("local[*]")
.setAppName("spark-example")
.setSparkHome(sparkHome + "/libexec");
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
Dataset<Row> df = spark.read().textFile(sparkHome + "/README.md")
.flatMap(line -> Arrays.asList(line.split(" ")).iterator(), Encoders.STRING())
.filter(s -> !s.isEmpty())
.map(word -> new Tuple2<>(word.toLowerCase(), 1L), Encoders.tuple(Encoders.STRING(), Encoders.LONG()))
.toDF("word", "count")
.groupBy("word")
.sum("count").orderBy(new Column("sum(count)").desc()).withColumnRenamed("sum(count)", "_cnt");
df.show(false);

你应该期待这个输出

+-------------+----+
|word         |_cnt|
+-------------+----+
|the          |25  |
|to           |19  |
|spark        |16  |
|for          |15  |
|and          |10  |
|a            |9   |
|##           |9   |
|you          |8   |
|run          |7   |
|on           |7   |
|can          |7   |
|is           |6   |
|in           |6   |
|of           |5   |
|using        |5   |
|including    |4   |
|if           |4   |
|with         |4   |
|documentation|4   |
|an           |4   |
+-------------+----+
only showing top 20 rows

如果您需要使用Tuple2,您应该使用Java的Scala库,即scala-library.jar

要准备来自某些JavaRDD<String> data的元组,您可以将以下函数应用于该RDD:

JavaRDD<Tuple2<String,Long>> tupleRDD  =  data.map(
new Function<String, Tuple2<String, Long>>() {
public Tuple2<String, Long> call(String str) {
return new Tuple2<String, Long>(str, 1L);
}//end call
}//end function
);//end map

试试这个

logData.flatMap((FlatMapFunction<String,String>)line -> Arrays.asList(line.split(" ")).iterator(), Encoders.STRING()).groupBy("value").count().show();

最新更新