Spark map vs foreachRdd



我们有一个火花流应用程序,我们从kafka接收Dstream,需要存储到DynamoDB ....我正在尝试两种方法来执行此操作,如下面的代码中所述

requestSwithState是Dstream

代码段1 with foreachrdd:

requestsWithState.foreachRDD { rdd =>
   println("Data being populated to Pulsar")
   rdd.foreach { case (id, eventStream) =>
     println("id is " + id + " Event is " + eventStream)
     DBUtils.putItem(dynamoConnection, id, eventStream.toString())
   }
}

代码代码段2与地图:

   requestsWithState.map (rdd => { rdd match {
         case (id, eventStream) => {
           println("id is " + id + " Event is " + eventStream)
           val dynamoConnection = setupDynamoClientConnection()
           DBUtils.putItem(dynamoConnection, id, eventStream.toString())
         }
       }
     })
 requestsWithState.print(1)

代码snippet1工作的良好并填充数据库...第二代码段不起作用。...我们是新手的火花,很想知道它背后的原因以及使它起作用的方法? ........我们正在实验的原因(我们知道这是一种转换,而foreachrdd是一个动作),对于我们的用例,在集群上重负载的用例非常慢,我们发现该地图如果我们可以使它工作要快得多.....请帮助我们获得地图代码

映射是火花中的一种转换(懒惰变换),除非您在此之后调用Spark Action,否则不会执行。有关火花转换和动作,请参考以下链接http://spark.apache.org/docs/latest/programming-guide.html#transformations

dstream.map返回另一个流。您应该在该流上调用打印,而不是原始的打印。

so s scala:

val transformedStream = requestsWithState.map (rdd => { rdd match {
         case (id, eventStream) => {
           println("id is " + id + " Event is " + eventStream)
           val dynamoConnection = setupDynamoClientConnection()
           DBUtils.putItem(dynamoConnection, id, eventStream.toString())
         }
       }
     })
transformedStream.print(1)

map的版本没有任何操作, .map不是动作,而是转换。

没有动作就不会执行转换。

参见例如http://training.databricks.com/visualapi.pdf或http://spark.apache.org/docs/latest/programpramming-guide.html#transformations

最新更新