这是我想要解决的问题:给定一个数据集作为输入,我想要生成一个数据集中的列表。输入数据集的数据集列表是使用某个属性的Min和Max值来定义的,该值将通过考虑Max&Min第二个数据集的属性值,这里有一个我想要的示例:如果我们将属性Flight和以下两个数据集作为属性:
1(
TicketId | Flight | time |
---------------------------------------|
10 | 123 | 2020-11-27 05:48:02|
---------------------------------------|
155 | 125 | 2020-11-27 05:49:02|
---------------------------------------|
12 | 133 | 2020-11-27 05:50:02|
---------------------------------------|
200 | 13 | 2020-11-27 06:49:02|
---------------------------------------|
123 | 22 | 2020-11-27 06:50:02|
---------------------------------------|
15 | 92 | 2020-11-27 05:51:02|
---------------------------------------|
21 | 41 | 2020-11-27 05:49:02|
---------------------------------------|
22 | 27 | 2020-11-27 05:50:02|
---------------------------------------|
422 | 35 | 2020-11-27 05:51:02|
---------------------------------------
第二个数据集如下:
2(
TicketId | Flight | time |
---------------------------------------|
103 | 156 | 2020-11-27 05:48:02|
---------------------------------------|
154 | 130 | 2020-11-27 05:49:02|
---------------------------------------|
123 | 151 | 2020-11-27 05:50:02|
---------------------------------------|
220 | 119 | 2020-11-27 06:49:02|
---------------------------------------|
143 | 111 | 2020-11-27 06:50:02|
---------------------------------------|
16 | 189 | 2020-11-27 05:51:02|
---------------------------------------|
22 | 152 | 2020-11-27 05:49:02|
---------------------------------------|
22 | 125 | 2020-11-27 05:50:02|
---------------------------------------|
134 | 187 | 2020-11-27 05:51:02|
---------------------------------------
然后,根据Flight属性,给定数据集2的Min值为111,则对数据集1进行分区后得到的数据集的恢复列表将为:
TicketId | Flight | time |
---------------------------------------|
10 | 123 | 2020-11-27 05:48:02|
---------------------------------------|
155 | 125 | 2020-11-27 05:49:02|
---------------------------------------|
12 | 133 | 2020-11-27 05:50:02|
---------------------------------------|
和
TicketId | Flight | time |
---------------------------------------|
200 | 13 | 2020-11-27 06:49:02|
---------------------------------------|
123 | 22 | 2020-11-27 06:50:02|
---------------------------------------|
15 | 92 | 2020-11-27 05:51:02|
---------------------------------------|
21 | 41 | 2020-11-27 05:49:02|
---------------------------------------|
22 | 27 | 2020-11-27 05:50:02|
---------------------------------------|
422 | 35 | 2020-11-27 05:51:02|
---------------------------------------
因为数据集2的值Min将相应地将数据集1划分为两个结果数据集。我的问题是如何在Spark/Java(甚至Scala(中实现这一点。注意:(属性Flight的(分区值可能是(数据集2的(属性的最大值
谢谢你的帮助。
首先,不可能从单个转换中产生多个RDD(由于DataFrames和Dataset是从RDD派生的,因此它也适用于它们(。这意味着我们不能在第一个数据集上使用where
/filter
方法的一行来分割它。相反,我们可以切入正题,使用基于两个数据集的Flight
的最小/最大值的条件来确定a(我们将基于分割的值,和b(拆分数据集的数量(只是因为您有一个条件,我们希望有3个而不是2个拆分数据集(。
由于我们知道存在3种重叠最小/最大值的有效情况:
- 按
min(df2)
拆分:
min(df1)------------------------------------max(df1)
min(df2)----------------------------------max(df2)
- 按
max(df2)
拆分:
min(df1)----------------------------------max(df1)
min(df2)------------------------------------max(df2)
- 按
min(df2)
和max(df2)
拆分:
min(df1)-----------------------------------------------max(df1)
min(df2)----------------max(df2)
剩下要做的就是:
- 从两个数据集中查找最大和最小CCD_ 8值
- 在一系列
if
/else if
语句中使用它们来确定我们从输入数据中得到的值重叠的情况,以及 - 通过一次使用简单的
where
方法过滤第一个数据集,创建2或3个新的数据集(可以在if
/else if
语句的范围内,也可以在它之外,具体取决于您想要做什么(
以下是用Scala编写的代码,以供Spark中更广泛的未来参考。(当然,您可以在Java中实现它,只需进行很小的更改,因为Spark语言端口之间的大多数命令都是可互换的(:
// store the min/max values of `Flight` as integers
val df1Max = df1.select(max("Flight")).head().getInt(0)
val df1Min = df1.select(min("Flight")).head().getInt(0)
val df2Max = df2.select(max("Flight")).head().getInt(0)
val df2Min = df2.select(min("Flight")).head().getInt(0)
if(df1Min < df2Min && df2Min < df1Max && df1Max < df2Max) // split by min(df2Min)
{
val firstDf = df1.where(col("Flight") <= df2Min)
val secondDf = df1.where(col("Flight") > df2Min)
firstDf.show()
secondDf.show()
// ... (store them in disk, process them, do whatever you want)
}
else if(df1Min > df2Min && df1Min < df2Max && df2Max < df1Max) // split by min(df2Max)
{
val firstDf = df1.where(col("Flight") <= df2Max)
val secondDf = df1.where(col("Flight") > df2Max)
firstDf.show()
secondDf.show()
// ... (store them in disk, process them, do whatever you want)
}
else if(df1Min > df2Min && df2Max < df1Max) // split by min(df2Min) and max(df2Max)
{
val firstDf = df1.where(col("Flight") <= df2Min)
val secondDf = df1.where(col("Flight") >= df2Min && col("Flight") <= df2Max)
val thirdDf = df1.where(col("Flight") > df2Max)
firstDf.show()
secondDf.show()
thirdDf.show()
// ... (store them in disk, process them, do whatever you want)
}