如何使用命令筛选器使用来自其他数据帧的信息筛选数据帧



我有一个大的Dataframe,里面有很多来自不同设备的信息及其ID。我想要的是用第二个数据帧中的ID来过滤这个数据帧。我知道使用命令join可以很容易地做到这一点,但我想尝试使用命令过滤器。

此外,我之所以尝试它,是因为我读到命令过滤器比联接更高效,有人能对此有所了解吗?

谢谢

我试过这个:

val DfFiltered = DF1.filter(col("Id").isin(DF2.rdd.map(r => r(0)).collect())

但我得到以下错误:

Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Unsupported component type class java.lang.Object in arrays;
=== Streaming Query ===
Identifier: [id = 0d89d684-d794-407d-a03c-feb3ad6a78c2, runId = b7b774c0-ce83-461e-ac26-7535d6d2b0ac]
Current Committed Offsets: {KafkaV2[Subscribe[MeterEEM]]: {"MeterEEM":{"0":270902}}}
Current Available Offsets: {KafkaV2[Subscribe[MeterEEM]]: {"MeterEEM":{"0":271296}}}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
Project [value2#21.meterid AS meterid#23]
+- Project [jsontostructs(StructField(meterid,StringType,true), cast(value#8 as string), Some(Europe/Paris)) AS value2#21]
+- StreamingExecutionRelation KafkaV2[Subscribe[MeterEEM]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: org.apache.spark.sql.AnalysisException: Unsupported component type class java.lang.Object in arrays;
at org.apache.spark.sql.catalyst.expressions.Literal$.componentTypeToDataType(literals.scala:117)
at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:70)
at org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$create$2.apply(literals.scala:164)
at org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$create$2.apply(literals.scala:164)
at scala.util.Try.getOrElse(Try.scala:79)
at org.apache.spark.sql.catalyst.expressions.Literal$.create(literals.scala:163)
at org.apache.spark.sql.functions$.typedLit(functions.scala:127)
at org.apache.spark.sql.functions$.lit(functions.scala:110)
at org.apache.spark.sql.Column$$anonfun$isin$1.apply(Column.scala:796)
at org.apache.spark.sql.Column$$anonfun$isin$1.apply(Column.scala:796)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.Column.isin(Column.scala:796)
at StreamingProcedure.MetersEEM$.meterEemCalcs(MetersEEM.scala:28)
at LaunchFunctions.LaunchMeterEEM$$anonfun$1.apply(LaunchMeterEEM.scala:23)
at LaunchFunctions.LaunchMeterEEM$$anonfun$1.apply(LaunchMeterEEM.scala:15)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:534)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:532)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:531)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apacahe.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBtchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
... 1 more

我假设Id列中的数据属于Integer数据类型。

val list = DF2.select("Id").as[Int] collect()
val DfFiltered = DF1.filter($"Id".isin(list: _*))
DfFiltered collect()

从High Performance Spark的书中,我们解释道:

连接数据是我们许多管道的重要组成部分,Spark Core和SQL都支持相同的基本连接类型。虽然联接非常常见且功能强大,但它们需要特别考虑性能,因为它们可能需要大型网络传输,甚至创建超出我们处理能力的数据集。1在core Spark中,考虑操作的顺序可能更重要,因为DAG优化器与SQL优化器不同,无法重新排序或下推过滤器。

因此,选择filter而不是join似乎是一个不错的选择

您可以简单地将(:_*(添加到代码中,这将非常有效。

scala> val DfFiltered = df.filter(col("a").isin(df2.rdd.map(r => r(0)).collect():_*)).show()

最新更新