根据另一个数据集的最小值和最大值属性对数据集进行分区



这是我想要解决的问题:给定一个数据集作为输入,我想要生成一个数据集中的列表。输入数据集的数据集列表是使用某个属性的MinMax值来定义的,该值将通过考虑Max&Min第二个数据集的属性值,这里有一个我想要的示例:如果我们将属性Flight和以下两个数据集作为属性:

1(

TicketId | Flight |           time       |
---------------------------------------|
10   |    123 |   2020-11-27 05:48:02|
---------------------------------------|
155  |    125 |   2020-11-27 05:49:02|
---------------------------------------|
12   |    133 |   2020-11-27 05:50:02|
---------------------------------------|
200  |    13  |   2020-11-27 06:49:02|
---------------------------------------|
123  |    22  |   2020-11-27 06:50:02|
---------------------------------------|
15   |    92  |   2020-11-27 05:51:02|
---------------------------------------|
21   |    41  |   2020-11-27 05:49:02|
---------------------------------------|
22   |    27  |   2020-11-27 05:50:02|
---------------------------------------|
422   |    35 |   2020-11-27 05:51:02|
---------------------------------------

第二个数据集如下:

2(

TicketId | Flight |           time       |
---------------------------------------|
103  |    156 |   2020-11-27 05:48:02|
---------------------------------------|
154  |    130 |   2020-11-27 05:49:02|
---------------------------------------|
123  |    151 |   2020-11-27 05:50:02|
---------------------------------------|
220  |    119 |   2020-11-27 06:49:02|
---------------------------------------|
143  |    111 |   2020-11-27 06:50:02|
---------------------------------------|
16   |    189 |   2020-11-27 05:51:02|
---------------------------------------|
22   |    152 |   2020-11-27 05:49:02|
---------------------------------------|
22   |    125 |   2020-11-27 05:50:02|
---------------------------------------|
134  |    187 |   2020-11-27 05:51:02|
---------------------------------------

然后,根据Flight属性,给定数据集2的Min值为111,则对数据集1进行分区后得到的数据集的恢复列表将为:

TicketId | Flight |           time       |
---------------------------------------|
10   |    123 |   2020-11-27 05:48:02|
---------------------------------------|
155  |    125 |   2020-11-27 05:49:02|
---------------------------------------|
12   |    133 |   2020-11-27 05:50:02|
---------------------------------------|

TicketId | Flight |           time       |
---------------------------------------|
200  |    13  |   2020-11-27 06:49:02|
---------------------------------------|
123  |    22  |   2020-11-27 06:50:02|
---------------------------------------|
15   |    92  |   2020-11-27 05:51:02|
---------------------------------------|
21   |    41  |   2020-11-27 05:49:02|
---------------------------------------|
22   |    27  |   2020-11-27 05:50:02|
---------------------------------------|
422   |    35 |   2020-11-27 05:51:02|
---------------------------------------

因为数据集2的值Min将相应地将数据集1划分为两个结果数据集。我的问题是如何在Spark/Java(甚至Scala(中实现这一点。注意:(属性Flight的(分区值可能是(数据集2的(属性的最大值

谢谢你的帮助。

首先,不可能从单个转换中产生多个RDD(由于DataFrames和Dataset是从RDD派生的,因此它也适用于它们(。这意味着我们不能在第一个数据集上使用where/filter方法的一行来分割它。相反,我们可以切入正题,使用基于两个数据集的Flight的最小/最大值的条件来确定a(我们将基于分割的值,和b(拆分数据集的数量(只是因为您有一个条件,我们希望有3个而不是2个拆分数据集(

由于我们知道存在3种重叠最小/最大值的有效情况:

  • min(df2)拆分:
min(df1)------------------------------------max(df1)
min(df2)----------------------------------max(df2)
  • max(df2)拆分:
min(df1)----------------------------------max(df1)
min(df2)------------------------------------max(df2)
  • min(df2)max(df2)拆分:
min(df1)-----------------------------------------------max(df1)
min(df2)----------------max(df2)

剩下要做的就是:

  1. 从两个数据集中查找最大和最小CCD_ 8值
  2. 在一系列if/else if语句中使用它们来确定我们从输入数据中得到的值重叠的情况,以及
  3. 通过一次使用简单的where方法过滤第一个数据集,创建2或3个新的数据集(可以在if/else if语句的范围内,也可以在它之外,具体取决于您想要做什么(

以下是用Scala编写的代码,以供Spark中更广泛的未来参考。(当然,您可以在Java中实现它,只需进行很小的更改,因为Spark语言端口之间的大多数命令都是可互换的(:

// store the min/max values of `Flight` as integers
val df1Max = df1.select(max("Flight")).head().getInt(0)
val df1Min = df1.select(min("Flight")).head().getInt(0)
val df2Max = df2.select(max("Flight")).head().getInt(0)
val df2Min = df2.select(min("Flight")).head().getInt(0)
if(df1Min < df2Min && df2Min < df1Max && df1Max < df2Max) // split by min(df2Min)
{
val firstDf = df1.where(col("Flight") <= df2Min)
val secondDf = df1.where(col("Flight") > df2Min)
firstDf.show()
secondDf.show()
// ... (store them in disk, process them, do whatever you want)
}
else if(df1Min > df2Min && df1Min < df2Max && df2Max < df1Max) // split by min(df2Max)
{
val firstDf = df1.where(col("Flight") <= df2Max)
val secondDf = df1.where(col("Flight") > df2Max)
firstDf.show()
secondDf.show()
// ... (store them in disk, process them, do whatever you want)
}
else if(df1Min > df2Min && df2Max < df1Max) // split by min(df2Min) and max(df2Max)
{
val firstDf = df1.where(col("Flight") <= df2Min)
val secondDf = df1.where(col("Flight") >= df2Min && col("Flight") <= df2Max)
val thirdDf = df1.where(col("Flight") > df2Max)
firstDf.show()
secondDf.show()
thirdDf.show()
// ... (store them in disk, process them, do whatever you want)
}

最新更新