谁能给我提供一些例子,从phoenix(完整的表,也使用查询)中读取DataFrame和Dataset(在Spark 2.0中),并将DataFrame和Dataset(在Spark 2.0中)写入phoenix,在java中的Apache Spark中。在java中没有任何关于这些的文档示例。
还提供多种方法,如果可能的话,喜欢从phoenix中读取,一种方法是我们可以使用PhoenixConfigurationUtil设置输入类和输入查询,然后从sparkContext中读取newAPIHadoopRDD,另一种方法是使用sqlContext.read().foramt("jdbc").options(pass a map with configuration keys like driver,url,dbtable).load()
,还有一种方法是使用sqlContext.read().format("org.apache.phoenix.spark").option(pass a map with configuration keys like url,table).load()
读取。
在搜索时,我在Spark 1.6的其他带有dataframe的问题中发现了这些方法,但示例不完整,这些方法只是零零碎碎地出现,所以我无法找出完整的步骤。我找不到任何Spark 2.0的例子
这是如何从phoenix中读取/写入的示例
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import com.google.common.collect.ImmutableMap;
import java.io.Serializable;
public class SparkConnection implements Serializable {
public static void main(String args[]) {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("spark-phoenix-df");
sparkConf.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
DataFrame fromPhx = sqlContext.read().format("jdbc")
.options(ImmutableMap.of("driver", "org.apache.phoenix.jdbc.PhoenixDriver", "url",
"jdbc:phoenix:ZK_QUORUM:2181:/hbase-secure", "dbtable", "TABLE1"))
.load();
fromPhx.write().format("org.apache.phoenix.spark").mode(SaveMode.Overwrite)
.options(ImmutableMap.of("driver", "org.apache.phoenix.jdbc.PhoenixDriver","zkUrl",
"jdbc:phoenix:localhost:2181","table","RESULT"))
.save();
}
}
在scala中,可以这样做:
import org.apache.phoenix.spark._
val sqlContext = spark.sqlContext
val df1 = sqlContext.read.format("jdbc").options(Map("driver" -> "org.apache.phoenix.jdbc.PhoenixDriver","url" -> "jdbc:phoenix:zk4-habsem.lzmf1fzmprtezol2fr25obrdth.jx.internal.cloudapp.net,zk5-habsem.lzmf1fzmprtezol2fr25obrdth.jx.internal.cloudapp.net,zk1-habsem.lzmf1fzmprtezol2fr25obrdth.jx.internal.cloudapp.net:2181:/hbase-unsecure", "dbtable" -> "table_name")).load();
此页面https://github.com/apache/phoenix/tree/master/phoenix-spark包含,如何将Phoenix表加载为RDD或DataFrame等示例。
例如,加载一个表为DataFrame:
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.phoenix.spark._
val sc = new SparkContext("local", "phoenix-test")
val sqlContext = new SQLContext(sc)
val df = sqlContext.load(
"org.apache.phoenix.spark",
Map("table" -> "TABLE1", "zkUrl" -> "phoenix-server:2181")
)
df
.filter(df("COL1") === "test_row_1" && df("ID") === 1L)
.select(df("ID"))
.show
下面的gist url是使用Java的完整示例。
https://gist.github.com/mravi/444afe7f49821819c987