我有两个输入数据集第一个输入数据集如下所示:
year,make,model,comment,blank
"2012","Tesla","S","No comment",
1997,Ford,E350,"Go get one now they are going fast",
2015,Chevy,Volt
第二个输入数据集:
TagId,condition
1997_cars,year = 1997 and model = 'E350'
2012_cars,year=2012 and model ='S'
2015_cars ,year=2015 and model = 'Volt'
现在我的要求是读取第一个数据集,并根据第二个数据集中的过滤条件,需要通过向第一个输入数据集引入新列 TagId 来标记第一个输入数据集的行所以预期应该看起来像:
year,make,model,comment,blank,TagId
"2012","Tesla","S","No comment",2012_cars
1997,Ford,E350,"Go get one now they are going fast",1997_cars
2015,Chevy,Volt, ,2015_cars
我试过:
val sqlContext = new SQLContext(sc)
val carsSchema = StructType(Seq(
StructField("year", IntegerType, true),
StructField("make", StringType, true),
StructField("model", StringType, true),
StructField("comment", StringType, true),
StructField("blank", StringType, true)))
val carTagsSchema = StructType(Seq(
StructField("TagId", StringType, true),
StructField("condition", StringType, true)))
val dfcars = sqlContext.read.format("com.databricks.spark.csv").option("header", "true") .schema(carsSchema).load("/TestDivya/Spark/cars.csv")
val dftags = sqlContext.read.format("com.databricks.spark.csv").option("header", "true") .schema(carTagsSchema).load("/TestDivya/Spark/CarTags.csv")
val Amendeddf = dfcars.withColumn("TagId", dfcars("blank"))
val cdtnval = dftags.select("condition")
val df2=dfcars.filter(cdtnval)
<console>:35: error: overloaded method value filter with alternatives:
(conditionExpr: String)org.apache.spark.sql.DataFrame <and>
(condition: org.apache.spark.sql.Column)org.apache.spark.sql.DataFrame
cannot be applied to (org.apache.spark.sql.DataFrame)
val df2=dfcars.filter(cdtnval)
另一种方式:
val col = dftags.col("TagId")
val finaldf = dfcars.withColumn("TagId", col)
org.apache.spark.sql.AnalysisException: resolved attribute(s) TagId#5 missing from comment#3,blank#4,model#2,make#1,year#0 in operator !Project [year#0,make#1,model#2,comment#3,blank#4,TagId#5 AS TagId#8];
finaldf.write.format("com.databricks.spark.csv").option("header", "true").save("/TestDivya/Spark/carswithtags.csv")
如果有人给我指示如何将过滤条件传递给数据帧的过滤功能,将不胜感激。或另一种解决方案.我对这样一个幼稚的问题表示歉意,因为我是 scala 和 Spark 的新手
谢谢
对此没有简单的解决方案。我认为你可以遵循两个大致方向:
-
将条件 (
dftags
) 收集到本地列表。然后一个接一个地通过它,在汽车(dfcars
)上执行每个作为filter
。使用结果获得所需的输出。 -
将条件 (
dftags
) 收集到本地列表。自己为它们实现解析和评估代码。遍历汽车(dfcars
)一次,评估map
中每行的规则集。
如果你有很多条件(所以你不能收集它们)和大量的汽车,那么情况就非常糟糕了。您需要根据每种条件检查每辆车,因此效率非常低。在这种情况下,您需要先优化规则集,以便更有效地评估它。(决策树可能是一个很好的解决方案。