火花与Cassandra Python设置



我正在尝试使用Spark在Cassandra桌上进行一些简单的计算,但我已经丢失了。

我正在尝试关注:https://github.com/datastax/spark-cassandra-connector/blob/master/master/doc/15_python.md

所以我正在运行pyspark shell:带有

./bin/pyspark 
  --packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3

,但我不确定如何从这里设置事情。我如何让Spark知道我的Cassandra簇在哪里?我已经看到CassandraSQLContext可以用于此方法,但我也读到这是不建议的。

我已经阅读了以下内容:如何使用Spark-Cassandra-Connector与Cassandra连接火花?

,但是如果我使用

import com.datastax.spark.connector._

python说它找不到模块。有人可以将我指向如何正确设置问题的正确方向吗?

  1. 复制Pyspark-Cassandra连接器Spark-Folder/Jars。
  2. 以下代码将连接到Cassandra。

    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SQLContext, SparkSession
    spark = SparkSession.builder 
      .appName('SparkCassandraApp') 
      .config('spark.cassandra.connection.host', 'localhost') 
      .config('spark.cassandra.connection.port', '9042') 
      .config('spark.cassandra.output.consistency.level','ONE') 
      .master('local[2]') 
      .getOrCreate()
    sqlContext = SQLContext(spark)
    ds = sqlContext 
      .read 
      .format('org.apache.spark.sql.cassandra') 
      .options(table='emp', keyspace='demo') 
      .load()
    ds.show(10) 
    

cassandra Connector不提供任何Python模块。所有功能都带有数据源API,只要存在必要的罐子,所有功能都应该从开箱即用。

我如何让Spark知道我的Cassandra簇在哪里?

使用spark.cassandra.connection.host属性。您可以将Exampel作为spark-submit/pyspark的参数传递给它:

pyspark ... --conf spark.cassandra.connection.host=x.y.z.v

或在您的配置中设置:

(SparkSession.builder
    .config("cassandra.connection.host", "x.y.z.v"))

诸如表名称或键空间之类的配置可以直接在读取器上设置:

(spark.read
    .format("org.apache.spark.sql.cassandra")
    .options(table="kv", keyspace="test", cluster="cluster")
    .load())

因此您可以关注DataFrames文档。

作为旁注

import com.datastax.spark.connector._

是Scala语法,仅在Python中被偶然地接受。

使用用户名和密码:

spark = SparkSession.builder 
  .appName('SparkCassandraApp') 
  .config('spark.cassandra.connection.host', 'localhost') 
  .config('spark.cassandra.connection.port', '9042') 
  .config("spark.cassandra.auth.username","cassandrauser")
  .config("spark.cassandra.auth.password","cassandrapwd")
  .master('local[2]') 
  .getOrCreate()
df = spark.read.format("org.apache.spark.sql.cassandra")
   .options(table="tablename", keyspace="keyspacename").load()
df.show()

相关内容

  • 没有找到相关文章

最新更新