在Spark-2.0中,创建SPARK会话的最佳方法是什么。因为在Spark-2.0和Cassandra中,API都已重新设计,从本质上贬低了SqlContext(以及CassandrasqlContext)。因此,要执行SQL-我要么创建一个Cassandra Session (com.datastax.driver.core.Session) and use execute( " ")
。或者我必须创建一个SparkSession (org.apache.spark.sql.SparkSession) and execute sql(String sqlText)
方法。
我不知道这两者的SQL限制 - 有人可以解释。
另外,如果我必须创建Sparksession-我该怎么做 - 找不到任何合适的例子。随着API的重新设计,旧示例不起作用。我要通过此代码示例 - 数据范围 - 尚不清楚这里使用了哪种SQL上下文(是否正确的方法正在进行中。)(由于某种原因,弃用的API甚至都没有编译 - 需要检查我的Eclipse设置)
谢谢
您需要Cassandra会话来创建/删除Cassandra db的表和表。在Spark应用程序中,要创建Cassandra会话,您需要将SparkConf传递给Cassandraconnector。在Spark 2.0中,您可以像下面一样做。
SparkSession spark = SparkSession
.builder()
.appName("SparkCassandraApp")
.config("spark.cassandra.connection.host", "localhost")
.config("spark.cassandra.connection.port", "9042")
.master("local[2]")
.getOrCreate();
CassandraConnector connector = CassandraConnector.apply(spark.sparkContext().conf());
Session session = connector.openSession();
session.execute("CREATE TABLE mykeyspace.mytable(id UUID PRIMARY KEY, username TEXT, email TEXT)");
如果您有现有的数据框,则可以使用DataFrameFunctions.createCassandraTable(Df)
在Cassandra中创建表。请参阅此处的API详细信息。
您可以使用Spark-Cassandra-Connector提供的API从Cassandra DB读取数据。
Dataset<Row> dataset = spark.read().format("org.apache.spark.sql.cassandra")
.options(new HashMap<String, String>() {
{
put("keyspace", "mykeyspace");
put("table", "mytable");
}
}).load();
dataset.show();
您可以使用SparkSession.sql()方法在Spark Cassandra Connector返回的数据框架上运行查询。
dataset.createOrReplaceTempView("usertable");
Dataset<Row> dataset1 = spark.sql("select * from usertable where username = 'Mat'");
dataset1.show();