使用Spark SQL,我有两个数据帧,它们是从一个数据帧创建的,例如:
df = sqlContext.createDataFrame(...);
df1 = df.filter("value = 'abc'"); //[path, value]
df2 = df.filter("value = 'qwe'"); //[path, value]
如果df1的"路径"的一部分是df2中的任何路径,我想过滤它。因此,如果df1中有一行的路径为"a/b/c/d/e",我会发现在df2中是否有一行路径为"a/b/c"。在SQL中,它应该像一样
SELECT * FROM df1 WHERE udf(path) IN (SELECT path FROM df2)
其中udf是用户定义的函数,它缩短了df1的原始路径。简单的解决方案是使用JOIN,然后过滤结果,但速度很慢,因为df1和df2的行数都超过10mil。
我也尝试了以下代码,但首先我必须从df2 创建广播变量
static Broadcast<DataFrame> bdf;
bdf = sc.broadcast(df2); //variable 'sc' is JavaSparkContext
sqlContext.createDataFrame(df1.javaRDD().filter(
new Function<Row, Boolean>(){
@Override
public Boolean call(Row row) throws Exception {
String foo = shortenPath(row.getString(0));
return bdf.value().filter("path = '"+foo+"'").count()>0;
}
}
), myClass.class)
我遇到的问题是Spark在评估返回/执行df2过滤时卡住了。
我想知道如何使用两个数据帧来做到这一点。我真的想避免加入。有什么想法吗?
编辑>>
在我的原始代码中,df1别名为"first",df2别名为"second"。此联接不是笛卡尔联接,也不使用广播。
df1 = df1.as("first");
df2 = df2.as("second");
df1.join(df2, df1.col("first.path").
lt(df2.col("second.path"))
, "left_outer").
filter("isPrefix(first.path, second.path)").
na().drop("any");
isPrefix是udf
UDF2 isPrefix = new UDF2<String, String, Boolean>() {
@Override
public Boolean call(String p, String s) throws Exception {
//return true if (p.length()+4==s.length()) and s.contains(p)
}};
shortenPath-它剪切路径中的最后两个字符
UDF1 shortenPath = new UDF1<String, String>() {
@Override
public String call(String s) throws Exception {
String[] foo = s.split("/");
String result = "";
for (int i = 0; i < foo.length-2; i++) {
result += foo[i];
if(i<foo.length-3) result+="/";
}
return result;
}
};
记录示例。路径是唯一的。
a/a/a/b/c abc
a/a/a qwe
a/b/c/d/e abc
a/b/c qwe
a/b/b/k foo
a/b/f/a bar
...
因此df1包含
a/a/a/b/c abc
a/b/c/d/e abc
...
df2由组成
a/a/a qwe
a/b/c qwe
...
您的代码至少有几个问题:
- 不能在另一个操作或转换中执行操作或转换。这意味着过滤广播的
DataFrame
根本无法工作,您应该得到一个异常 - 您使用的
join
是作为笛卡尔乘积执行的,后面跟着过滤器。由于Spark使用Hashing
进行连接,因此只有基于等式的连接才能在没有笛卡尔坐标的情况下有效执行。这与为什么在SQL查询中使用UDF会导致笛卡尔乘积略有关联 - 如果两个CCD_ 4都相对较大并且具有相似的大小,则广播不太可能有用。查看为什么我的BroadcastHashJoin比Spark中的ShuffledHashJoin慢
- 在性能方面并不重要,但
isPrefix
似乎错了。特别是它看起来可以匹配前缀和后缀 col("first.path").lt(col("second.path"))
条件看起来有问题。我想你想要df1
的a/a/a/b/c
与df2
的a/a/a
相匹配。如果是,则应该是gt
而不是lt
也许你能做的最好的事情就是类似的事情:
import org.apache.spark.sql.functions.{col, regexp_extract}
val df = sc.parallelize(Seq(
("a/a/a/b/c", "abc"), ("a/a/a","qwe"),
("a/b/c/d/e", "abc"), ("a/b/c", "qwe"),
("a/b/b/k", "foo"), ("a/b/f/a", "bar")
)).toDF("path", "value")
val df1 = df
.where(col("value") === "abc")
.withColumn("path_short", regexp_extract(col("path"), "^(.*)(/.){2}$", 1))
.as("df1")
val df2 = df.where(col("value") === "qwe").as("df2")
val joined = df1.join(df2, col("df1.path_short") === col("df2.path"))
您可以尝试这样广播其中一个表(仅Spark>=1.5.0):
import org.apache.spark.sql.functions.broadcast
df1.join(broadcast(df2), col("df1.path_short") === col("df2.path"))
并增加自动广播限制,但正如我上面提到的,它很可能比普通HashJoin
效率更低。
作为用子查询实现IN
的一种可能方式,可以使用LEFT SEMI JOIN
:
JavaSparkContext javaSparkContext = new JavaSparkContext("local", "testApp");
SQLContext sqlContext = new SQLContext(javaSparkContext);
StructType schema = DataTypes.createStructType(new StructField[]{
DataTypes.createStructField("path", DataTypes.StringType, false),
DataTypes.createStructField("value", DataTypes.StringType, false)
});
// Prepare First DataFrame
List<Row> dataForFirstDF = new ArrayList<>();
dataForFirstDF.add(RowFactory.create("a/a/a/b/c", "abc"));
dataForFirstDF.add(RowFactory.create("a/b/c/d/e", "abc"));
dataForFirstDF.add(RowFactory.create("x/y/z", "xyz"));
DataFrame df1 = sqlContext.createDataFrame(javaSparkContext.parallelize(dataForFirstDF), schema);
//
df1.show();
//
// +---------+-----+
// | path|value|
// +---------+-----+
// |a/a/a/b/c| abc|
// |a/b/c/d/e| abc|
// | x/y/z| xyz|
// +---------+-----+
// Prepare Second DataFrame
List<Row> dataForSecondDF = new ArrayList<>();
dataForSecondDF.add(RowFactory.create("a/a/a", "qwe"));
dataForSecondDF.add(RowFactory.create("a/b/c", "qwe"));
DataFrame df2 = sqlContext.createDataFrame(javaSparkContext.parallelize(dataForSecondDF), schema);
// Use left semi join to filter out df1 based on path in df2
Column pathContains = functions.column("firstDF.path").contains(functions.column("secondDF.path"));
DataFrame result = df1.as("firstDF").join(df2.as("secondDF"), pathContains, "leftsemi");
//
result.show();
//
// +---------+-----+
// | path|value|
// +---------+-----+
// |a/a/a/b/c| abc|
// |a/b/c/d/e| abc|
// +---------+-----+
此类查询的物理计划如下所示:
== Physical Plan ==
Limit 21
ConvertToSafe
LeftSemiJoinBNL Some(Contains(path#0, path#2))
ConvertToUnsafe
Scan PhysicalRDD[path#0,value#1]
TungstenProject [path#2]
Scan PhysicalRDD[path#2,value#3]
它将使用LeftSemiJoinBNL进行实际的联接操作,该操作应在内部广播值。更多细节请参阅Spark中的实际实现-LeftSemiJoinBNL.scala
附言:我不太理解删除最后两个字符的必要性,但如果需要的话,可以这样做,就像@zero323建议的那样(使用regexp_extract
)。