使用elasticsearch为python创建批量索引/文档



我正在使用python生成大量随机内容的elasticsearch文档,并使用elasticsearch-py对它们进行索引。

简化工作示例(只有一个字段的文档):

from elasticsearch import Elasticsearch
from random import getrandbits
es_client = Elasticsearch('https://elastic.host:9200')
for i in range(1,10000000):
    document = {'my_field': getrandbits(64)}
    es_client.index(index='my_index', document=document)

因为这会对每个文档发出一个请求,所以我试图通过使用_bulk API发送1000个文档块来加快速度。然而,到目前为止,我的尝试都没有成功。

我从文档的理解是,你可以传递一个可迭代到bulk(),所以我尝试:

from elasticsearch import Elasticsearch
from random import getrandbits
es_client = Elasticsearch('https://elastic.host:9200')
document_list = []
for i in range(1,10000000):
    document = {'my_field': getrandbits(64)}
    document_list.append(document)
    if i % 1000 == 0:
        es_client.bulk(operations=document_list, index='my_index')
        document_list = []

,但是这会导致

elasticsearch。BadRequestError: BadRequestError(400, 'illegal_argument_exception', '错误的操作/元数据行[1],期望START_OBJECT或END_OBJECT,但发现[VALUE_STRING]')

好的,似乎我混淆了两个不同的功能:helpers.bulk()Elasticsearch.bulk()。两者都可以用来实现我想要做的事情,但它们的签名略有不同。

helpers.bulk()函数接受一个Elasticsearch()对象和一个包含文档作为参数的可迭代对象。可以指定为_op_type,也可以是indexcreatedeleteupdate中的一种。由于_op_type默认为index,我们可以省略它,并在本例中简单地传递文档列表:

from elasticsearch import Elasticsearch, helpers
from random import getrandbits
es_client = Elasticsearch('https://elastic.host:9200')
document_list = []
for i in range(1,10000000):
    document = {'my_field': getrandbits(64)}
    document_list.append(document)
    if i % 1000 == 0:
        helpers.bulk(es_client, document_list, index='my_index')
        document_list = []

也可以使用Elasticsearch.bulk()函数,但是这里的动作/操作是强制性的,作为可迭代对象的一部分,语法略有不同。这意味着,我们需要一个dict来指定每个文档的动作(在本例中为"index": {})和主体,而不仅仅是一个包含文档内容的dict。参见_bulk文档:

from elasticsearch import Elasticsearch
from random import getrandbits
es_client = Elasticsearch('https://elastic.host:9200')
actions_list = []
for i in range(1,10000000):
    document = {'my_field': getrandbits(64)}
    actions_list.append({"index": {}, "doc": document})
    if i % 1000 == 0:
        es_client.bulk(operations=actions_list, index='my_index')
        actions_list = []

这个也可以。

我假设上面两个都在内部生成相同的_bulk REST API语句,所以它们最终应该是等效的。

最新更新