我正在使用python生成大量随机内容的elasticsearch文档,并使用elasticsearch-py对它们进行索引。
简化工作示例(只有一个字段的文档):
from elasticsearch import Elasticsearch
from random import getrandbits
es_client = Elasticsearch('https://elastic.host:9200')
for i in range(1,10000000):
document = {'my_field': getrandbits(64)}
es_client.index(index='my_index', document=document)
因为这会对每个文档发出一个请求,所以我试图通过使用_bulk
API发送1000个文档块来加快速度。然而,到目前为止,我的尝试都没有成功。
我从文档的理解是,你可以传递一个可迭代到bulk()
,所以我尝试:
from elasticsearch import Elasticsearch
from random import getrandbits
es_client = Elasticsearch('https://elastic.host:9200')
document_list = []
for i in range(1,10000000):
document = {'my_field': getrandbits(64)}
document_list.append(document)
if i % 1000 == 0:
es_client.bulk(operations=document_list, index='my_index')
document_list = []
,但是这会导致
elasticsearch。BadRequestError: BadRequestError(400, 'illegal_argument_exception', '错误的操作/元数据行[1],期望START_OBJECT或END_OBJECT,但发现[VALUE_STRING]')
好的,似乎我混淆了两个不同的功能:helpers.bulk()
和Elasticsearch.bulk()
。两者都可以用来实现我想要做的事情,但它们的签名略有不同。
helpers.bulk()
函数接受一个Elasticsearch()
对象和一个包含文档作为参数的可迭代对象。可以指定为_op_type
,也可以是index
、create
、delete
或update
中的一种。由于_op_type
默认为index
,我们可以省略它,并在本例中简单地传递文档列表:
from elasticsearch import Elasticsearch, helpers
from random import getrandbits
es_client = Elasticsearch('https://elastic.host:9200')
document_list = []
for i in range(1,10000000):
document = {'my_field': getrandbits(64)}
document_list.append(document)
if i % 1000 == 0:
helpers.bulk(es_client, document_list, index='my_index')
document_list = []
也可以使用Elasticsearch.bulk()
函数,但是这里的动作/操作是强制性的,作为可迭代对象的一部分,语法略有不同。这意味着,我们需要一个dict
来指定每个文档的动作(在本例中为"index": {}
)和主体,而不仅仅是一个包含文档内容的dict
。参见_bulk
文档:
from elasticsearch import Elasticsearch
from random import getrandbits
es_client = Elasticsearch('https://elastic.host:9200')
actions_list = []
for i in range(1,10000000):
document = {'my_field': getrandbits(64)}
actions_list.append({"index": {}, "doc": document})
if i % 1000 == 0:
es_client.bulk(operations=actions_list, index='my_index')
actions_list = []
这个也可以。
我假设上面两个都在内部生成相同的_bulk
REST API语句,所以它们最终应该是等效的。