在Java中读写Apache Phoenix的Apache Spark方法



谁能给我提供一些例子,从phoenix(完整的表,也使用查询)中读取DataFrame和Dataset(在Spark 2.0中),并将DataFrame和Dataset(在Spark 2.0中)写入phoenix,在java中的Apache Spark中。在java中没有任何关于这些的文档示例。

还提供多种方法,如果可能的话,喜欢从phoenix中读取,一种方法是我们可以使用PhoenixConfigurationUtil设置输入类和输入查询,然后从sparkContext中读取newAPIHadoopRDD,另一种方法是使用sqlContext.read().foramt("jdbc").options(pass a map with configuration keys like driver,url,dbtable).load(),还有一种方法是使用sqlContext.read().format("org.apache.phoenix.spark").option(pass a map with configuration keys like url,table).load()读取。

在搜索时,我在Spark 1.6的其他带有dataframe的问题中发现了这些方法,但示例不完整,这些方法只是零零碎碎地出现,所以我无法找出完整的步骤。我找不到任何Spark 2.0的例子

这是如何从phoenix中读取/写入的示例

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import com.google.common.collect.ImmutableMap;
import java.io.Serializable;
public class SparkConnection implements Serializable {
    public static void main(String args[]) {
        SparkConf sparkConf = new SparkConf();
        sparkConf.setAppName("spark-phoenix-df");
        sparkConf.setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
        DataFrame fromPhx = sqlContext.read().format("jdbc")
                .options(ImmutableMap.of("driver", "org.apache.phoenix.jdbc.PhoenixDriver", "url",
                        "jdbc:phoenix:ZK_QUORUM:2181:/hbase-secure", "dbtable", "TABLE1"))
                .load();
        fromPhx.write().format("org.apache.phoenix.spark").mode(SaveMode.Overwrite)
        .options(ImmutableMap.of("driver", "org.apache.phoenix.jdbc.PhoenixDriver","zkUrl",
                "jdbc:phoenix:localhost:2181","table","RESULT"))
        .save();
    }
}

在scala中,可以这样做:

import org.apache.phoenix.spark._
val sqlContext = spark.sqlContext
val df1 =    sqlContext.read.format("jdbc").options(Map("driver" -> "org.apache.phoenix.jdbc.PhoenixDriver","url" -> "jdbc:phoenix:zk4-habsem.lzmf1fzmprtezol2fr25obrdth.jx.internal.cloudapp.net,zk5-habsem.lzmf1fzmprtezol2fr25obrdth.jx.internal.cloudapp.net,zk1-habsem.lzmf1fzmprtezol2fr25obrdth.jx.internal.cloudapp.net:2181:/hbase-unsecure", "dbtable" -> "table_name")).load();

此页面https://github.com/apache/phoenix/tree/master/phoenix-spark包含,如何将Phoenix表加载为RDD或DataFrame等示例。

例如,加载一个表为DataFrame:

import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.phoenix.spark._
val sc = new SparkContext("local", "phoenix-test")
val sqlContext = new SQLContext(sc)
val df = sqlContext.load(
  "org.apache.phoenix.spark", 
  Map("table" -> "TABLE1", "zkUrl" -> "phoenix-server:2181")
)
df
  .filter(df("COL1") === "test_row_1" && df("ID") === 1L)
  .select(df("ID"))
  .show
下面的gist url是使用Java的完整示例。 https://gist.github.com/mravi/444afe7f49821819c987

相关内容

  • 没有找到相关文章

最新更新