如何在Spark中使用Elasticsearch -hadoop从一个Elasticsearch集群重新索引数据到另一个



我有两个分开的Elasticsearch集群,我想将数据从第一个集群重新索引到第二个集群,但是我发现我只能在SparkContext配置中设置一个Elasticsearch集群,例如:

var sparkConf : SparkConf = new SparkConf()
                     .setAppName("EsReIndex")
sparkConf.set("es.nodes", "node1.cluster1:9200")

那么我如何在同一个应用程序内部的Spark中使用elastic search-hadoop在两个Elasticsearch集群之间移动数据?

您不需要在SparkConf中配置节点地址。

当你使用elasticsearch格式的DataFrameWriter时,你可以像下面这样传递节点地址作为一个选项:

val df = sqlContext.read
                  .format("elasticsearch")
                  .option("es.nodes", "node1.cluster1:9200")
                  .load("your_index/your_type")
df.write
    .option("es.nodes", "node2.cluster2:9200")
    .save("your_new_index/your_new_type")

这应该在spark 1.6中工作。X和相应的elasticsearch-hadoop连接器。

最新更新