ElasticSearch ConnectionPool using elasticsearch-py library



我是 ElasticSearch 的新手,并尝试使用 ElasticSearch ConnectionPool 的并发连接 [通过 Transport class] 将条目添加到 ElasticSearch 中的索引中。

这是我的代码:

import elasticsearch
from elasticsearch.transport import Transport
def init_connection():
    transport = Transport([{'host':SERVER_URL}], port=SERVER_PORT, randomize_hosts=False)
    transport.add_connection(host=SERVER_URL+SERVER_PORT)
    return transport
def add_entries_to_es(id, name):
    transport = init_connection()
    doc = {
           'name': name,
           'postDate': datetime.datetime.now(),
           'valid': "true",
           'suggest': {
               "input": name,
               'output': name,
               'payload': {'domain_id': id}
               }
           }
    conn = transport.getConnection()
    es = elasticsearch.Elasticsearch(connection_class=conn)
    res = es.index(index=ES_INDEX_NAME, doc_type=ES_DOC_TYPE, id=id, body=doc)
    ...

我收到以下错误:

File "/my_project/elastichelper.py", line 23, in init_connection
transport.add_connection(host=SERVER_URL+SERVER_PORT)
File "/Library/Python/2.7/site-packages/elasticsearch/transport.py", line 139, in add_connection
self.set_connections(self.hosts)
File "/Library/Python/2.7/site-packages/elasticsearch/transport.py", line 169, in set_connections
connections = map(_create_connection, hosts)
File "/Library/Python/2.7/site-packages/elasticsearch/transport.py", line 161, in _create_connection
kwargs.update(host)
ValueError: dictionary update sequence element #0 has length 1; 2 is required

我不确定Transport class是否是在 ElasticSearch 中实例化ConnectionPool的正确方法。 但是,我从文档中读到,传输类处理各个连接的实例化以及创建连接池来保存它们。

我没有获得实例化ConnectionPool并从池中有效使用连接的正确方法。阅读和谷歌搜索对我没有任何帮助。

我也知道 helpers.bulk() API,但我对使用它感到困惑,因为除了将条目添加到索引外,我还删除了无效条目。

我发现简单地使用 ElasticSearch 类实例,并为 index 方法设置适当的timeout值 [对我来说,timeout=30已经足够好了]。喜欢这个:

doc = {
       'name': name,
       'postDate': datetime.datetime.now(),
       'valid': "true",
       'suggest': {
           "input": name,
           'output': name,
           'payload': {'domain_id': id}
           }
       }
es = elasticsearch.Elasticsearch()
res = es.index(index=ES_INDEX_NAME, doc_type=ES_DOC_TYPE, id=id, body=doc, timeout=30)

我最初在简单ElasticSearch类实例中遇到了timeout问题,这些问题已通过上述更改修复。

我根本不需要显式使用TransportConnection类实例。

相关内容

  • 没有找到相关文章

最新更新