使用java连接spark数据集



我有两个数据集正在尝试组合:

数据集1(机器(:

String machineID:
List<Integer> machineCat;(100,200,300)

数据集2(汽车(:

String carID:
List<Integer> carCat;(30,200,100,300)

我基本上需要从数据集1中获取List machineCat的每一项,并检查它是否包含在数据集2的List carCat中。如果匹配,则将2个数据集组合如下:

最终数据集:

machineID,machineCat(100),carID,carCat(100)
machineID,machineCat(200),carID,carCat(200)
machineID,machineCat(300),carID,carCat(400)

关于如何在java中使用数据集联接来实现这一点的任何帮助。

查看具有arrays_tain(如下所示(的选项

machine.foreachPartition((ForeachPartitionFunction<Machine>) iterator -> {
while (iterator.hasNext()) {
Machine machine = iterator.next();
machine.getmachineCat().stream().filter(cat -> {
LOG.info("matched");
spark.sql(
"select * from machineDataset m"
+ " join"
+ " carDataset c "
+ "where array_contains(m.machineCat,cat)");
return true;
});
}
});
import static org.apache.spark.sql.functions.*; // before main class
Machine machine = new Machine("m1",Arrays.asList(100,200,300));
Car car = new Car("c1", Arrays.asList(30,200,100,300));
Dataset<Row> mDF= spark.createDataFrame(Arrays.asList(machine), Machine.class);
mDF.show();
Dataset<Row> cDF= spark.createDataFrame(Arrays.asList(car), Car.class);
cDF.show();

输出:

+---------------+---------+
|     machineCat|machineId|
+---------------+---------+
|[100, 200, 300]|       m1|
+---------------+---------+
+-------------------+-----+
|             carCat|catId|
+-------------------+-----+
|[30, 200, 100, 300]|   c1|
+-------------------+-----+

然后

Dataset<Row> mDF2 = mDF.select(col("machineId"),explode(col("machineCat")).as("machineCat"));
Dataset<Row> cDF2 = cDF.select(col("catId"),explode(col("carCat")).as("carCat"));
Dataset<Row> joinedDF = mDF2.join(cDF2).where(mDF2.col("machineCat").equalTo(cDF2.col("carCat")));
Dataset<Row> finalDF = joinedDF.select(col("machineId"),array(col("machineCat")), col("catId"),array(col("carCat")) );
finalDF.show();

最后:

+---------+-----------------+-----+-------------+
|machineId|array(machineCat)|catId|array(carCat)|
+---------+-----------------+-----+-------------+
|       m1|            [100]|   c1|        [100]|
|       m1|            [200]|   c1|        [200]|
|       m1|            [300]|   c1|        [300]|
+---------+-----------------+-----+-------------+
root
|-- machineId: string (nullable = true)
|-- array(machineCat): array (nullable = false)
|    |-- element: integer (containsNull = true)
|-- catId: string (nullable = true)
|-- array(carCat): array (nullable = false)
|    |-- element: integer (containsNull = true)

最新更新