为什么在加入后选择会在Java Spark DataFrame中引起异常



我有两个数据范围:左右。它们是由三列组成的相同的: src relation, dest且具有相同的值。

1-我试图加入这些数据范围,其中条件是左侧的dst =右侧的src。但这是不起作用的。错误在哪里?

Dataset<Row> r = left
  .join(right, left.col("dst").equalTo(right.col("src")));

结果:

+---+---------+---+---+---------+---+
|src|predicate|dst|src|predicate|dst|
+---+---------+---+---+---------+---+
+---+---------+---+---+---------+---+

2-如果我在左侧的dst重命名为DST,而右侧的SRC列则以DST2为单位,那么我应用了一个JOIN,它可以工作。但是,如果我尝试从选定的数据框架中选择一些列。它引起了例外。我的错误在哪里?

 Dataset<Row> left = input_df.withColumnRenamed("dst", "dst2");
 Dataset<Row> right = input_df.withColumnRenamed("src", "dst2");  
 Dataset<Row> r = left.join(right, left.col("dst2").equalTo(right.col("dst2")));

然后:

left.show();

给出:

+---+---------+----+
|src|predicate|dst2|
+---+---------+----+
|  a|       r1| :b1|
|  a|       r2|   k|
|:b1|       r3| :b4|
|:b1|      r10|   d|
|:b4|       r4|   f|
|:b4|       r5| :b5|
|:b5|       r9|   t|
|:b5|      r10|   e|
+---+---------+----+

right.show();

给出:

+----+---------+---+
|dst2|predicate|dst|
+----+---------+---+
|   a|       r1|:b1|
|   a|       r2|  k|
| :b1|       r3|:b4|
| :b1|      r10|  d|
| :b4|       r4|  f|
| :b4|       r5|:b5|
| :b5|       r9|  t|
| :b5|      r10|  e|
+----+---------+---+

结果:

+---+---------+----+----+---------+---+
|src|predicate|dst2|dst2|predicate|dst|
+---+---------+----+----+---------+---+
|  a|       r1| b1| b1  |      r10|  d|
|  a|       r1| b1| b1  |       r3| b4|
|b1 |       r3| b4| b4  |       r5| b5|
|b1 |       r3| b4| b4  |       r4|  f|
+---+---------+----+----+---------+---+

Dataset<Row> r = left
  .join(right, left.col("dst2").equalTo(right.col("dst2")))
  .select(left.col("src"),right.col("dst"));

结果:

Exception in thread "main" org.apache.spark.sql.AnalysisException: resolved attribute(s) dst#45 missing from dst2#177,src#43,predicate#197,predicate#44,dst2#182,dst#198 in operator !Project [src#43, dst#45];

3-假设所选的作品,如何将获得的数据框添加到左数据框中。

im在Java工作。

您正在使用:

r = r.select(left.col("src"), right.col("dst"));

似乎Spark找不到返回right数据框架的谱系。并不令人震惊,因为它经历了很多优化。

假设您所需的输出为:

+---+---+
|src|dst|
+---+---+
| b1|:b5|
| b1|  f|
|:b4|  e|
|:b4|  t|
+---+---+

您可以使用此3个选项之一:

使用col()方法

Dataset<Row> resultOption1Df = r.select(left.col("src"), r.col("dst"));
resultOption1Df.show();

使用col()静态函数

Dataset<Row> resultOption2Df = r.select(col("src"), col("dst"));
resultOption2Df.show();

使用列名

Dataset<Row> resultOption3Df = r.select("src", "dst");
resultOption3Df.show();

这是完整的源代码:

package net.jgp.books.spark.ch12.lab990_others;
import static org.apache.spark.sql.functions.col;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
/**
 * Self join.
 * 
 * @author jgp
 */
public class SelfJoinAndSelectApp {
  /**
   * main() is your entry point to the application.
   * 
   * @param args
   */
  public static void main(String[] args) {
    SelfJoinAndSelectApp app = new SelfJoinAndSelectApp();
    app.start();
  }
  /**
   * The processing code.
   */
  private void start() {
    // Creates a session on a local master
    SparkSession spark = SparkSession.builder()
        .appName("Self join")
        .master("local[*]")
        .getOrCreate();
    Dataset<Row> inputDf = createDataframe(spark);
    inputDf.show(false);
    Dataset<Row> left = inputDf.withColumnRenamed("dst", "dst2");
    left.show();
    Dataset<Row> right = inputDf.withColumnRenamed("src", "dst2");
    right.show();
    Dataset<Row> r = left.join(
        right,
        left.col("dst2").equalTo(right.col("dst2")));
    r.show();
    Dataset<Row> resultOption1Df = r.select(left.col("src"), r.col("dst"));
    resultOption1Df.show();
    Dataset<Row> resultOption2Df = r.select(col("src"), col("dst"));
    resultOption2Df.show();
    Dataset<Row> resultOption3Df = r.select("src", "dst");
    resultOption3Df.show();
  }
  private static Dataset<Row> createDataframe(SparkSession spark) {
    StructType schema = DataTypes.createStructType(new StructField[] {
        DataTypes.createStructField(
            "src",
            DataTypes.StringType,
            false),
        DataTypes.createStructField(
            "predicate",
            DataTypes.StringType,
            false),
        DataTypes.createStructField(
            "dst",
            DataTypes.StringType,
            false) });
    List<Row> rows = new ArrayList<>();
    rows.add(RowFactory.create("a", "r1", ":b1"));
    rows.add(RowFactory.create("a", "r2", "k"));
    rows.add(RowFactory.create("b1", "r3", ":b4"));
    rows.add(RowFactory.create("b1", "r10", "d"));
    rows.add(RowFactory.create(":b4", "r4", "f"));
    rows.add(RowFactory.create(":b4", "r5", ":b5"));
    rows.add(RowFactory.create(":b5", "r9", "t"));
    rows.add(RowFactory.create(":b5", "r10", "e"));
    return spark.createDataFrame(rows, schema);
  }
}

相关内容

  • 没有找到相关文章