如何使用 Spark 在 elasticsearch 中实现索引更新功能?



我是ElasticSearch的新手。我有大量的数据可以使用Elasticsearch进行索引。

我正在使用Apache Spark使用Elasticsearch索引hive表中的数据。

作为此功能的一部分,我编写了简单的Spark Script。

object PushToES {
def main(args: Array[String]) {
val Array(inputQuery, index, host) = args
val sparkConf = new SparkConf().setMaster("local[1]").setAppName("PushToES")
sparkConf.set("....",Host)
sparkConf.set("....","9200")
val sc = new SparkContext(sparkConf)
val ht = new org.apache.spark.sql.hive.HiveContext(sc)
val ps = hhiveSqlContext.sql(inputQuery)
ps.toJSON.saveJsonToEs(index)
}
}

之后,我生成jar并使用火花提交提交作业

spark-submit --jars ~/*.jar --master local[*] --class com.PushToES *.jar "select * from gtest where day=20170711" gest3 localhost

然后我正在执行以下命令

curl -XGET 'localhost:9200/test/test_test/_count?pretty'

第一次正确显示

{
"count" : 10,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
}
}

如果我第二次执行相同的 curl 命令,它会给出像 bleow 这样的结果

{
"count" : 20,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
}
}

如果我第三次执行相同的命令,我得到的命令

{
"count" : 30,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
}
}

但我不明白为什么每次它都会向现有索引值(即计数(添加计数值

请让我知道如何解决此问题,即,如果我执行任意次数,我也必须获得相同的值(正确的计数值,即 10(

I am expecting below result for this case because correct count value is 10.(我在 hive 表上执行了计数查询,每次计数 (*( 为 10(

{
"count" : 10,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
}
}

提前谢谢.

如果要在每次运行时"替换"数据,而不是"追加"数据,则必须在 Spark Elasticsearch 属性中针对此类方案进行配置。

您需要做的第一件事是在文档中有一个 ID,并告诉 elastisearch 您的 id "列"(如果您来自数据帧(或键(以 json 术语表示(是什么。

这记录在这里: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html

对于需要指定文档的 id(或其他元数据字段,如 ttl 或时间戳(的情况,可以通过设置适当的映射(即 es.mapping.id(来实现。按照前面的示例,要指示 Elasticsearch 使用字段 id 作为文档 ID,请更新 RDD 配置(也可以在 SparkConf 上设置属性,但由于其全局影响,不鼓励这样做(:

EsSpark.saveToEs(rdd, "spark/docs", Map("es.mapping.id" -> "id"))

第二个配置键可用于控制 elasticsearch 在写入数据时尝试执行的作业类型,但默认值对于您的用户案例是正确的:

es.write.operation (default index(

写入操作 elasticsearch-hadoop 应该采用 - 可以是以下任何一种:

索引(默认( 添加新数据,同时替换(基于其 ID(现有数据(重新编制索引(。

创造 添加新数据 - 如果数据已存在(基于其 ID(,则会引发异常。

更新 更新现有数据(基于其 ID(。如果未找到数据,则会引发异常。

更新插入 如果数据不存在,则称为合并或插入,如果数据存在,则更新(基于其 ID(。

最新更新