SPASSANDRA的Spark2会话,SQL查询



在Spark-2.0中,创建SPARK会话的最佳方法是什么。因为在Spark-2.0和Cassandra中,API都已重新设计,从本质上贬低了SqlContext(以及CassandrasqlContext)。因此,要执行SQL-我要么创建一个Cassandra Session (com.datastax.driver.core.Session) and use execute( " ")。或者我必须创建一个SparkSession (org.apache.spark.sql.SparkSession) and execute sql(String sqlText)方法。

我不知道这两者的SQL限制 - 有人可以解释。

另外,如果我必须创建Sparksession-我该怎么做 - 找不到任何合适的例子。随着API的重新设计,旧示例不起作用。我要通过此代码示例 - 数据范围 - 尚不清楚这里使用了哪种SQL上下文(是否正确的方法正在进行中。)(由于某种原因,弃用的API甚至都没有编译 - 需要检查我的Eclipse设置)

谢谢

您需要Cassandra会话来创建/删除Cassandra db的表和表。在Spark应用程序中,要创建Cassandra会话,您需要将SparkConf传递给Cassandraconnector。在Spark 2.0中,您可以像下面一样做。

 SparkSession spark = SparkSession
              .builder()
              .appName("SparkCassandraApp")
              .config("spark.cassandra.connection.host", "localhost")
              .config("spark.cassandra.connection.port", "9042")
              .master("local[2]")
              .getOrCreate();
CassandraConnector connector = CassandraConnector.apply(spark.sparkContext().conf());
Session session = connector.openSession();
session.execute("CREATE TABLE mykeyspace.mytable(id UUID PRIMARY KEY, username TEXT, email TEXT)");

如果您有现有的数据框,则可以使用DataFrameFunctions.createCassandraTable(Df)在Cassandra中创建表。请参阅此处的API详细信息。

您可以使用Spark-Cassandra-Connector提供的API从Cassandra DB读取数据。

Dataset<Row> dataset = spark.read().format("org.apache.spark.sql.cassandra")
            .options(new HashMap<String, String>() {
                {
                    put("keyspace", "mykeyspace");
                    put("table", "mytable");
                }
            }).load();
dataset.show(); 

您可以使用SparkSession.sql()方法在Spark Cassandra Connector返回的数据框架上运行查询。

dataset.createOrReplaceTempView("usertable");
Dataset<Row> dataset1 = spark.sql("select * from usertable where username = 'Mat'");
dataset1.show();

相关内容

  • 没有找到相关文章