如何使用PySpark,SparkSQL和Cassandra



我对这个故事中的不同参与者有点困惑:PySpark,SparkSQL,Cassandra和pyspark-cassandra连接器。

据我了解,Spark发展了很多,SparkSQL现在是一个关键组件(带有"数据帧")。显然,没有SparkSQL绝对没有理由工作,尤其是在连接到Cassandra的情况下。

所以我的问题是:需要什么组件以及如何以最简单的方式将它们连接在一起?

有了 Scala 中的spark-shell,我可以简单地做

./bin/spark-shell --jars spark-cassandra-connector-java-assembly-1.6.0-M1-SNAPSHOT.jar

然后

import org.apache.spark.sql.cassandra.CassandraSQLContext
val cc = new CassandraSQLContext(sc)
cc.setKeyspace("mykeyspace")
val dataframe = cc.sql("SELECT count(*) FROM mytable group by beamstamp")

我怎样才能用pyspark做到这一点?

这里有几个子问题以及我收集的部分答案(如果我错了,请纠正)。

  • 是否需要 pyspark-casmandra(我不这么认为 - 我不明白一开始在做什么)

  • 我是否需要使用pyspark或者我可以使用常规jupyter notebook并自己导入必要的东西?

Pyspark 应该从 spark-cassandra-connector 包开始,如 Spark Cassandra Connector python 文档中所述。

./bin/pyspark 
  --packages com.datastax.spark:spark-cassandra-connector_$SPARK_SCALA_VERSION:$SPARK_VERSION

加载此内容后,您将能够在 C* 数据帧上使用 Spark 中已存在的任何数据帧操作。有关使用 C* 数据帧的选项的更多详细信息。

要将其设置为与jupyter notebook一起运行,只需使用以下属性设置 env。

export PYSPARK_DRIVER_PYTHON=ipython
export PYSPARK_DRIVER_PYTHON_OPTS=notebook

调用pyspark将启动正确配置的笔记本。

没有必要使用pyspark-cassandra,除非你在python中使用RDD时穿插在一起,这有一些性能缺陷。

In Python 连接器DataFrame API 公开。只要spark-cassandra-connector可用并且SparkConf包含所需的配置,就不需要其他软件包。您可以简单地指定格式和选项:

df = (sqlContext
    .read
    .format("org.apache.spark.sql.cassandra")
    .options(table="mytable", keyspace="mykeyspace")
    .load())

如果您想使用纯 SQL,您可以按如下方式注册DataFrame

df.registerTempTable("mytable")
## Optionally cache
sqlContext.cacheTable("mytable")
sqlContext.sql("SELECT count(*) FROM mytable group by beamstamp")

连接器的高级功能(如CassandraRDD)不会向 Python 公开,因此如果您需要超出DataFrame功能的东西,那么pyspark-cassandra可能会很有用。

相关内容

  • 没有找到相关文章

最新更新