如何在Spark Java中遍历/迭代数据集



我试图遍历数据集,以执行一些字符串相似性计算,例如Jaro Winkler或Cosine相似性。我将数据集转换为行列表,然后随着语句的遍历,这不是有效的火花方法。因此,我期待在Spark中采取更好的方法。

public class sample {
    public static void main(String[] args) {
        JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Example").setMaster("local[*]"));
        SQLContext sqlContext = new SQLContext(sc);
        SparkSession spark = SparkSession.builder().appName("JavaTokenizerExample").getOrCreate();
        List<Row> data = Arrays.asList(RowFactory.create("Mysore","Mysuru"),
                RowFactory.create("Name","FirstName"));
        StructType schema = new StructType(
                new StructField[] { new StructField("Word1", DataTypes.StringType, true, Metadata.empty()),
                        new StructField("Word2", DataTypes.StringType, true, Metadata.empty()) });
        Dataset<Row> oldDF = spark.createDataFrame(data, schema);
        oldDF.show();
        List<Row> rowslist = oldDF.collectAsList(); 
    }
}

我找到了许多我尚不清楚的Javardd示例。数据集的一个示例将对我有很大帮助。

您可以使用org.apache.spark.api.java.function.ForeachFunction如下。

oldDF.foreach((ForeachFunction<Row>) row -> System.out.println(row));

对于不支持lambda表达式的旧Java JDK,您可以在导入之后使用以下内容:

导入org.apache.spark.api.java.function.voidfunction;

yourDataSet.toJavaRDD().foreach(new VoidFunction<Row>() {
        public void call(Row r) throws Exception {
            System.out.println(r.getAs("your column name here"));
        }
    });

最新更新