我在迭代时使用ForeachFunction
迭代Dataset<Row>
,我不知道如何将某些自定义列附加到该行中,然后将其附加到Spark Java中的另一个Dataset<Row>
代码:
groupedDataset.foreach((ForeachFunction<Row>) row -> {
double average = //some value
// the Row has four columns
// All I want is to have a new Dataset<Row> with specific columns
// from the Row i.e row(0),row(1),row(3) and average value
Dataset<Row> newDs = row.getString("ID"),row.getString("time"),row.getInt("value"),average;
});
我尝试了很多,但我无法解决。
谢谢!
行不应该直接修改(可能但不便利)。操纵数据框(行数据集)时,您应该使用SparkSQL API,其主要原因是:1。易于使用2.它允许Spark可以对您的请求进行大量优化。
现在,这个示例似乎看起来像您要实现的目标。基本上,我创建了一个带有三列的数据集,然后使用一个选择来平均两个结果,并丢弃最后一个。让我知道您是否需要更多详细信息。
SparkSession spark = SparkSession.builder().getOrCreate();
Dataset<Row> data = spark
.range(10)
.select(col("id").as("id"),
col("id").cast("string").as("str"),
col("id").plus(5).as("id5") );
data.show();
Dataset<Row> result = data
.select(col("id"), col("id5"),
col("id").plus(col("id5")).divide(2).as("avg"));
result.show();
产生:
+---+---+---+
| id|str|id5|
+---+---+---+
| 0| 0| 5|
| 1| 1| 6|
| 2| 2| 7|
+---+---+---+
+---+---+---+
| id|id5|avg|
+---+---+---+
| 0| 5|2.5|
| 1| 6|3.5|
| 2| 7|4.5|
+---+---+---+