Scala:使用spark从"锡拉"获取数据



scala/spark新手。我继承了一个旧代码,我已经对它进行了重构,并试图使用它来从"锡拉"中检索数据。代码看起来像:

val TEST_QUERY = s"SELECT user_id FROM test_table WHERE name = ? AND id_type = 'test_type';"
var selectData = List[Row]()
dataRdd.foreachPartition {
iter => {
// Build up a cluster that we can connect to
// Start a session with the cluster by connecting to it.
val cluster = ScyllaConnector.getCluster(clusterIpString, scyllaPreferredDc, scyllaUsername, scyllaPassword)
var batchCounter = 0
val session = cluster.connect(tableConfig.keySpace)
val preparedStatement: PreparedStatement = session.prepare(TEST_QUERY)
iter.foreach {
case (test_name: String) => {
// Get results
val testResults = session.execute(preparedStatement.bind(test_name))
if (testResults != null){
val testResult = testResults.one()
if(testResult != null){
val user_id = testResult.getString("user_id")
selectData ::= Row(user_id, test_name)
}
}
}
}
session.close()
cluster.close()
}
}
println("Head is =======> ")
println(selectData.head)

上面的操作不返回任何数据,并且由于空指针异常而失败,因为selectedData列表是空的,尽管其中确实有与select语句匹配的数据。我觉得我的做法不正确,但不知道需要改变什么才能解决这个问题,所以非常感谢任何帮助。

PS:我使用列表来保存结果的整个想法是,我可以使用该列表来创建数据帧。如果你能给我指明正确的方向,我将不胜感激。

如果你查看foreachPartition函数的定义,你会发现它根据定义不能返回任何东西,因为它的返回类型是void

无论如何,这是一种非常糟糕的方式来查询来自Cassandra/来自Spark的Scylla的数据。因为存在Spark Cassandra连接器,由于协议兼容性,该连接器也应该能够与"锡拉"一起工作。

要从Cassandra读取数据帧,只需执行以下操作:

spark.read
.format("cassandra")
.option("keyspace", "ksname")
.option("table", "tab")
.load()

文档非常详细,所以只需阅读即可。

最新更新