调整弹性写入性能



我正试图在Elasticsearch集群中保存(并索引(170GB文件(约9.15亿行和25列(。我在一个5节点的弹性搜索集群上得到了糟糕的性能。这项任务大约需要5小时。Spark集群有150个核心10x(15个CPU,64个RAM(。

这是我目前的工作流程:

  • 从S3的多个镶木地板文件构建Spark数据框架
  • 然后使用"将该数据帧保存到ElasticSearch索引org.elasticsearch.spark.sql"源于Spark。(我尝试了许多分片复制配置组合,但性能没有提高(

这是集群节点的特征

  • 每个节点5个(16个CPU、64个RAM、700GB磁盘(
  • HEAP_SIZE大约是可用RAM的50%,意味着每个节点上有32GB。在/etc/elasticsearch/jvm.options中配置

这是将数据帧写入ElasticSearch(用scala编写(的代码

writeDFToEs(whole_df, "main-index")

writeDFToEs函数:

def writeDFToEs(df: DataFrame, index: String) = {
df.write
.format("org.elasticsearch.spark.sql")
.option("es.nodes", "192.168.1.xxx")
.option("es.http.timeout", 600000)
.option("es.http.max_content_length", "2000mb")
.option("es.port", 9200)
.mode("overwrite")
.save(s"$index")
}

你能帮我找出我做得不好的地方吗?如何解决?

提前谢谢。

回答我自己的问题

正如@warkolm所建议的那样,我专注于_bulk

我使用es hadoop连接器,所以我必须调整es.batch.size.entries参数。

在运行了一系列测试(测试了各种值(后,我终于得到了更好的结果(尽管仍然不是最优的(,es.batch.size.entries设置为1000,并在ES索引模板中使用以下值。

{
"index": {
"number_of_shards": "10",
"number_of_replicas": "0",
"refresh_interval": "60s"
}
}

最后,我的df.write看起来是这样的:

df.write
.format("org.elasticsearch.spark.sql")
.option("es.nodes", es_nodes)
.option("es.port", es_port)
.option("es.http.timeout", 600000)
.option("es.batch.size.entries", 10000)
.option("es.http.max_content_length", "2000mb")
.mode("overwrite")
.save(s"$writeTo")

现在这个过程需要约3小时(2小时55分钟(,而不是5小时

我仍在改进配置和代码。如果我有更好的表现,我会更新。

最新更新