如何在从数据集中选择列的不同值时保持数据顺序



我的数据集如下,

+------+------+---------------+
| col1 | col2 |  sum(costs)   |
+------+------+---------------+
|    1 | a    | 3555204326.27 |
|    4 | b    | 22273491.72   |
|    5 | c    | 219175.00     |
|    3 | a    | 219175.00     |
|    2 | c    | 75341433.37   |
+------+------+---------------+

我需要选择COL1的不同值,而最终的数据集应为1、4、5、3、2(初始数据集中可用的这些值可用的顺序)。但是订单被改组了。是否有任何方法可以保持与临界数据集相同的顺序。Spark/SQL中的任何建议都可以。

可以通过Spark的以下顺序获得此数据集。

df = sqlCtx.createDataFrame(
  [(1, a, 355.27), (4, b, 222.98), (5, c, 275.00), (3, a, 25.00),
   (2, c, 753.37)], ('Col1', 'col2', 'cost'));

您可以添加包含每一行索引的另一列,然后在"不同"之后在该列上排序。这是一个示例:

import org.apache.spark.sql.functions._
val df = Seq(1, 4, 4, 5, 2)
  .toDF("a")
  .withColumn("id", monotonically_increasing_id())
df.show()
// +---+---+
// |  a| id|
// +---+---+
// |  1|  0|
// |  4|  1|
// |  4|  2|
// |  5|  3|
// |  2|  4|
// +---+---+
df.dropDuplicates("a").sort("id").show()
// +---+---+
// |  a| id|
// +---+---+
// |  1|  0|
// |  4|  1|
// |  5|  3|
// |  2|  4|
// +---+---+

请注意,要在1个特定列上进行独特的操作,可以使用dropDuplicates,如果要控制要在重复的情况下要进行哪一行,然后使用groupBy

假设您试图远程远程col2中的重复项(因为col1中没有),因此最终结果将是:

+----+----+---------------+
|col1|col2|            sum|
+----+----+---------------+
|   1|   a|3.55520432627E9|
|   4|   b|  2.227349172E7|
|   5|   c|       219175.0|
+----+----+---------------+

您可以添加一个索引列,例如:

df = df.withColumn("__idx", monotonically_increasing_id());

然后进行所有您想要的转换,然后将其放置,例如:

df = df.dropDuplicates("col2").orderBy("__idx").drop("__idx");

这意味着要:

步骤1:加载数据和东西:

+----+----+---------------+
|col1|col2|            sum|
+----+----+---------------+
|   1|   a|3.55520432627E9|
|   4|   b|  2.227349172E7|
|   5|   c|       219175.0|
|   3|   a|       219175.0|
|   2|   c|  7.534143337E7|
+----+----+---------------+

步骤2:添加索引:

+----+----+---------------+-----+
|col1|col2|            sum|__idx|
+----+----+---------------+-----+
|   1|   a|3.55520432627E9|    0|
|   4|   b|  2.227349172E7|    1|
|   5|   c|       219175.0|    2|
|   3|   a|       219175.0|    3|
|   2|   c|  7.534143337E7|    4|
+----+----+---------------+-----+

步骤3:转换(在此处删除col2中的DUP),然后删除__idx列:

+----+----+---------------+
|col1|col2|            sum|
+----+----+---------------+
|   1|   a|3.55520432627E9|
|   4|   b|  2.227349172E7|
|   5|   c|       219175.0|
+----+----+---------------+

Java代码可能是:

package net.jgp.books.spark.ch12.lab990_others;
import static org.apache.spark.sql.functions.monotonically_increasing_id;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
/**
 * Keeping the order of rows during transformations.
 * 
 * @author jgp
 */
public class KeepingOrderApp {
  /**
   * main() is your entry point to the application.
   * 
   * @param args
   */
  public static void main(String[] args) {
    KeepingOrderApp app = new KeepingOrderApp();
    app.start();
  }
  /**
   * The processing code.
   */
  private void start() {
    // Creates a session on a local master
    SparkSession spark = SparkSession.builder()
        .appName("Splitting a dataframe to collect it")
        .master("local")
        .getOrCreate();
    Dataset<Row> df = createDataframe(spark);
    df.show();
    df = df.withColumn("__idx", monotonically_increasing_id());
    df.show();
    df = df.dropDuplicates("col2").orderBy("__idx").drop("__idx");
    df.show();
  }
  private static Dataset<Row> createDataframe(SparkSession spark) {
    StructType schema = DataTypes.createStructType(new StructField[] {
        DataTypes.createStructField(
            "col1",
            DataTypes.IntegerType,
            false),
        DataTypes.createStructField(
            "col2",
            DataTypes.StringType,
            false),
        DataTypes.createStructField(
            "sum",
            DataTypes.DoubleType,
            false) });
    List<Row> rows = new ArrayList<>();
    rows.add(RowFactory.create(1, "a", 3555204326.27));
    rows.add(RowFactory.create(4, "b", 22273491.72));
    rows.add(RowFactory.create(5, "c", 219175.0));
    rows.add(RowFactory.create(3, "a", 219175.0));
    rows.add(RowFactory.create(2, "c", 75341433.37));
    return spark.createDataFrame(rows, schema);
  }
}

您可以在DB中添加索引列,然后在SQL请求中订购ID

我相信您需要重新格式化查询和使用群体而不是像这样的答案,这表明了SQL:如何将行订单保持不同?

相关内容

  • 没有找到相关文章

最新更新