在 for 循环中构造 Spark sql 数据集



TestDF是一个数据框。 可以在 for 循环内编辑/更改 10 次吗?

Spark 没有用于编辑和保存回同一数据集的选项。

Java也没有动态变量赋值。

需要做类似这样的事情"数据集<Row> testDF+(i+1) = testDF+(i)"(动态变量)或"数据集<Row> testDF = testDF"(在同一数据集中)在 for 循环中。

有没有办法在循环内部循环火花DF?

String[] arraytest = schemaString.split(";");
for (int i=0;i < arraytest .length;i++) {
    String fieldName = arraytest[i];
    Dataset<Row> testDF+(i+1) = testDF+(i)
       .withColumn(fieldName, 
           functions.when(functions.col(fieldName).equalTo(""),"-99")
           .otherwise(functions.col(fieldName)));
    }

为此使用累加器

例如,您可以使用这样的东西

CollectionAccumulator<String> queryAccumulator = sparkSession.sparkContext().collectionAccumulator();

Dataset<Row> table= sparkSession.sql("select * from table");
    table.foreachPartition(partition->{
        while(partition.hasNext()){
            Row row = partition.next();
        String sql="select * from table where cond1= '"+row.getAs("cond1")+"' and cond2= '"+row.getAs("cond2")+"' order " +
                "by starttime desc limit 24";
            queryAccumulator.add(sql);}
    });
    logger.info("Queries to be executed are {}",queryAccumulator.value());
    Dataset<Row> limitedDataDf= queryAccumulator.value().stream().map(query-> sparkSession.sql(query)).reduce(Dataset::union).get();
    limitedDataDf.createOrReplaceTempView("SPARKJOBINFORMATION_LIMIT");

相关内容

  • 没有找到相关文章

最新更新