基于第二个DataFrame的DataFrame筛选



使用Spark SQL,我有两个数据帧,它们是从一个数据帧创建的,例如:

df = sqlContext.createDataFrame(...);
df1 = df.filter("value = 'abc'"); //[path, value]
df2 = df.filter("value = 'qwe'"); //[path, value]

如果df1的"路径"的一部分是df2中的任何路径,我想过滤它。因此,如果df1中有一行的路径为"a/b/c/d/e",我会发现在df2中是否有一行路径为"a/b/c"。在SQL中,它应该像一样

SELECT * FROM df1 WHERE udf(path) IN (SELECT path FROM df2)

其中udf是用户定义的函数,它缩短了df1的原始路径。简单的解决方案是使用JOIN,然后过滤结果,但速度很慢,因为df1和df2的行数都超过10mil。

我也尝试了以下代码,但首先我必须从df2 创建广播变量

static Broadcast<DataFrame> bdf;
bdf = sc.broadcast(df2); //variable 'sc' is JavaSparkContext 
sqlContext.createDataFrame(df1.javaRDD().filter(
         new Function<Row, Boolean>(){
             @Override
             public Boolean call(Row row) throws Exception {
                 String foo = shortenPath(row.getString(0));
                 return bdf.value().filter("path = '"+foo+"'").count()>0;
             }
          }
    ), myClass.class)

我遇到的问题是Spark在评估返回/执行df2过滤时卡住了。

我想知道如何使用两个数据帧来做到这一点。我真的想避免加入。有什么想法吗?


编辑>>

在我的原始代码中,df1别名为"first",df2别名为"second"。此联接不是笛卡尔联接,也不使用广播。

df1 = df1.as("first");
df2 = df2.as("second");
    df1.join(df2, df1.col("first.path").
                                lt(df2.col("second.path"))
                                      , "left_outer").
                    filter("isPrefix(first.path, second.path)").
                    na().drop("any");

isPrefix是udf

UDF2 isPrefix = new UDF2<String, String, Boolean>() {
        @Override
        public Boolean call(String p, String s) throws Exception {
            //return true if (p.length()+4==s.length()) and s.contains(p)
        }};

shortenPath-它剪切路径中的最后两个字符

UDF1 shortenPath = new UDF1<String, String>() {
        @Override
        public String call(String s) throws Exception {
            String[] foo = s.split("/");
            String result = "";
            for (int i = 0; i < foo.length-2; i++) {
                result += foo[i];
                if(i<foo.length-3) result+="/";
            }
            return result;
        }
    };

记录示例。路径是唯一的。

a/a/a/b/c abc
a/a/a     qwe
a/b/c/d/e abc
a/b/c     qwe
a/b/b/k   foo
a/b/f/a   bar
...

因此df1包含

a/a/a/b/c abc
a/b/c/d/e abc
...

df2由组成

a/a/a     qwe
a/b/c     qwe
...

您的代码至少有几个问题:

  • 不能在另一个操作或转换中执行操作或转换。这意味着过滤广播的DataFrame根本无法工作,您应该得到一个异常
  • 您使用的join是作为笛卡尔乘积执行的,后面跟着过滤器。由于Spark使用Hashing进行连接,因此只有基于等式的连接才能在没有笛卡尔坐标的情况下有效执行。这与为什么在SQL查询中使用UDF会导致笛卡尔乘积略有关联
  • 如果两个CCD_ 4都相对较大并且具有相似的大小,则广播不太可能有用。查看为什么我的BroadcastHashJoin比Spark中的ShuffledHashJoin慢
  • 在性能方面并不重要,但isPrefix似乎错了。特别是它看起来可以匹配前缀和后缀
  • col("first.path").lt(col("second.path"))条件看起来有问题。我想你想要df1a/a/a/b/cdf2a/a/a相匹配。如果是,则应该是gt而不是lt

也许你能做的最好的事情就是类似的事情:

import org.apache.spark.sql.functions.{col, regexp_extract}
val df = sc.parallelize(Seq(
    ("a/a/a/b/c", "abc"), ("a/a/a","qwe"),
    ("a/b/c/d/e", "abc"), ("a/b/c", "qwe"),
    ("a/b/b/k", "foo"), ("a/b/f/a", "bar")
)).toDF("path", "value")
val df1 = df
    .where(col("value") === "abc")    
    .withColumn("path_short", regexp_extract(col("path"), "^(.*)(/.){2}$", 1))
    .as("df1")
val df2 = df.where(col("value") === "qwe").as("df2")
val joined = df1.join(df2, col("df1.path_short") === col("df2.path"))

您可以尝试这样广播其中一个表(仅Spark>=1.5.0):

import org.apache.spark.sql.functions.broadcast
df1.join(broadcast(df2), col("df1.path_short") === col("df2.path"))

并增加自动广播限制,但正如我上面提到的,它很可能比普通HashJoin效率更低。

作为用子查询实现IN的一种可能方式,可以使用LEFT SEMI JOIN

    JavaSparkContext javaSparkContext = new JavaSparkContext("local", "testApp");
    SQLContext sqlContext = new SQLContext(javaSparkContext);
    StructType schema = DataTypes.createStructType(new StructField[]{
            DataTypes.createStructField("path", DataTypes.StringType, false),
            DataTypes.createStructField("value", DataTypes.StringType, false)
    });
    // Prepare First DataFrame
    List<Row> dataForFirstDF = new ArrayList<>();
    dataForFirstDF.add(RowFactory.create("a/a/a/b/c", "abc"));
    dataForFirstDF.add(RowFactory.create("a/b/c/d/e", "abc"));
    dataForFirstDF.add(RowFactory.create("x/y/z", "xyz"));
    DataFrame df1 = sqlContext.createDataFrame(javaSparkContext.parallelize(dataForFirstDF), schema);
    // 
    df1.show();
    //
    // +---------+-----+
    // |     path|value|
    // +---------+-----+
    // |a/a/a/b/c|  abc|
    // |a/b/c/d/e|  abc|
    // |    x/y/z|  xyz|
    // +---------+-----+
    // Prepare Second DataFrame
    List<Row> dataForSecondDF = new ArrayList<>();
    dataForSecondDF.add(RowFactory.create("a/a/a", "qwe"));
    dataForSecondDF.add(RowFactory.create("a/b/c", "qwe"));
    DataFrame df2 = sqlContext.createDataFrame(javaSparkContext.parallelize(dataForSecondDF), schema);
    // Use left semi join to filter out df1 based on path in df2
    Column pathContains = functions.column("firstDF.path").contains(functions.column("secondDF.path"));
    DataFrame result = df1.as("firstDF").join(df2.as("secondDF"), pathContains, "leftsemi");
    //
    result.show();
    //
    // +---------+-----+
    // |     path|value|
    // +---------+-----+
    // |a/a/a/b/c|  abc|
    // |a/b/c/d/e|  abc|
    // +---------+-----+

此类查询的物理计划如下所示:

== Physical Plan ==
Limit 21
 ConvertToSafe
  LeftSemiJoinBNL Some(Contains(path#0, path#2))
   ConvertToUnsafe
    Scan PhysicalRDD[path#0,value#1]
   TungstenProject [path#2]
    Scan PhysicalRDD[path#2,value#3]

它将使用LeftSemiJoinBNL进行实际的联接操作,该操作应在内部广播值。更多细节请参阅Spark中的实际实现-LeftSemiJoinBNL.scala

附言:我不太理解删除最后两个字符的必要性,但如果需要的话,可以这样做,就像@zero323建议的那样(使用regexp_extract)。

相关内容

  • 没有找到相关文章

最新更新