我正在开发Elasticsearch集群升级自动化工具。出于演示目的(为了证明我的升级可以实现零停机时间(,我编写了一个 Python 程序,该程序在升级时不断将数据流式传输到集群中:
#get a Python client
es = Elasticsearch(
[HOST_NAME + ":" + str(HTTP_PORT)],
retry_on_timeout = True,
sniff_on_start = True,
sniff_on_connection_fail = True,
sniff_timeout = 60
)
在上面的代码片段中,HOST_NAME和HTTP_PORT是群集中某个节点的 IP 地址和 HTTP 端口(升级之前(。但是,我选择了一种错位升级策略,以便所有旧的集群节点(具有较低的 Elasticsearch 版本(最终都将退役(在它们的所有分片都重新定位到具有更高 Elasticsearch 版本的新创建的节点之后(。停用旧节点时,Python 客户端会遇到以下错误:
Traceback (most recent call last):
File "main.py", line 51, in <module>
start()
File "main.py", line 48, in start
ingest_log_stream(INDEX_NAME, INPUT_DATA_FILE, GAP)
File "data_stream_ingestor.py", line 19, in ingest_log_stream
ingest_log_entry(indexName, logEntry)
File "data_ingestor.py", line 25, in ingest_log_entry
es = get_es_connection()
File "es_connector.py", line 19, in get_es_connection
], sniff_on_start=True, sniff_on_connection_fail=True, sniffer_timeout=60)
File "/home/.local/lib/python3.6/site-packages/elasticsearch/client/__init__.py", line 206, in __init__
self.transport = transport_class(_normalize_hosts(hosts), **kwargs)
File "/home/.local/lib/python3.6/site-packages/elasticsearch/transport.py", line 141, in __init__
self.sniff_hosts(True)
File "/home/.local/lib/python3.6/site-packages/elasticsearch/transport.py", line 261, in sniff_hosts
node_info = self._get_sniff_data(initial)
File "/home/.local/lib/python3.6/site-packages/elasticsearch/transport.py", line 230, in _get_sniff_data
raise TransportError("N/A", "Unable to sniff hosts.")
elasticsearch.exceptions.TransportError: TransportError(N/A, 'Unable to sniff hosts.')
Elasticsearch Python 客户端 lib 文档建议
If a connection to a node fails due to connection issues (raises ConnectionError) it is considered in faulty state. It will be placed on hold for dead_timeout seconds and the request will be retried on another node. If a connection fails multiple times in a row the timeout will get progressively larger to avoid hitting a node that’s, by all indication, down. If no live connection is available, the connection that has the smallest timeout will be used.
但是,在我看来,设置retry_on_timeout
和其他sniffing
选项并不能解决问题。我想知道实例化 Elasticsearch 客户端的正确方法是什么,以便在它连接到的节点出现故障时,它会自动尝试连接到集群中的其他节点?谢谢!
也许你可以使用这种模式
for attempt in xrange(5):
try:
your_connection()
except Exception as e: #Should CATCH only your fail connection exception
close_and_clean_things()
else:
#This is when code on try is done OK!
break