我们有一个火花流应用程序,我们从kafka接收Dstream,需要存储到DynamoDB ....我正在尝试两种方法来执行此操作,如下面的代码中所述
requestSwithState是Dstream
代码段1 with foreachrdd:
requestsWithState.foreachRDD { rdd =>
println("Data being populated to Pulsar")
rdd.foreach { case (id, eventStream) =>
println("id is " + id + " Event is " + eventStream)
DBUtils.putItem(dynamoConnection, id, eventStream.toString())
}
}
代码代码段2与地图:
requestsWithState.map (rdd => { rdd match {
case (id, eventStream) => {
println("id is " + id + " Event is " + eventStream)
val dynamoConnection = setupDynamoClientConnection()
DBUtils.putItem(dynamoConnection, id, eventStream.toString())
}
}
})
requestsWithState.print(1)
代码snippet1工作的良好并填充数据库...第二代码段不起作用。...我们是新手的火花,很想知道它背后的原因以及使它起作用的方法? ........我们正在实验的原因(我们知道这是一种转换,而foreachrdd是一个动作),对于我们的用例,在集群上重负载的用例非常慢,我们发现该地图如果我们可以使它工作要快得多.....请帮助我们获得地图代码
映射是火花中的一种转换(懒惰变换),除非您在此之后调用Spark Action,否则不会执行。有关火花转换和动作,请参考以下链接http://spark.apache.org/docs/latest/programming-guide.html#transformations
dstream.map返回另一个流。您应该在该流上调用打印,而不是原始的打印。
so s scala:
val transformedStream = requestsWithState.map (rdd => { rdd match {
case (id, eventStream) => {
println("id is " + id + " Event is " + eventStream)
val dynamoConnection = setupDynamoClientConnection()
DBUtils.putItem(dynamoConnection, id, eventStream.toString())
}
}
})
transformedStream.print(1)
map
的版本没有任何操作, .map
不是动作,而是转换。
没有动作就不会执行转换。
参见例如http://training.databricks.com/visualapi.pdf或http://spark.apache.org/docs/latest/programpramming-guide.html#transformations