将 JSON 文档加载到 ElasticSearch 时'Data too large'传输错误 (429)



我有一个在Python 3.7中运行的进程,它加载JSON文件,在异步队列中将文件行收集成块,并增量地将块发布到ElasticSearch以进行索引。

分块是为了避免ElasticSearch连接过载。
def load_files_to_queue(file_queue, incomplete_files, doc_queue, index):
logger.info("Initializing load files to queue")
while True:
try:
current_file = file_queue.get(False)
logger.info("Loading {} into queue.".format(current_file))
iteration_counter = 0
with open(current_file) as loaded_file:
iterator = json_iterator(loaded_file)
current_type = "doctype"
chunk = []
for row in iterator:
# Every so often check the queue size
iteration_counter += 1
if iteration_counter > 5000:
# If it gets too big, pause until it has gone
# down a bunch.
if doc_queue.qsize() > 30:
logger.info(
"Doc queue at {}, pausing until smaller.".format(
doc_queue.qsize()
)
)
while doc_queue.qsize() > 10:
time.sleep(0.5)
iteration_counter = 0
for transformed in transform_single_doc(current_type, row, index):
if transformed:
chunk.append(transformed)
# NOTE: Send messages in chunks in stead of single rows so that queue
# has less frequent locking
if len(chunk) >= DOC_QUEUE_CHUNK_SIZE:
doc_queue.put(chunk)
chunk = []
if chunk:
doc_queue.put(chunk)
incomplete_files.remove(current_file)
logger.info("Finished loading {} into queue.".format(current_file))
logger.info("There are {} files left to load.".format(file_queue.qsize()))
except Empty:
break
def bulk_load_from_queue(file_queue, incomplete_files, doc_queue, chunk_size=500):
"""
Represents a single worker thread loading docs into ES
"""
logger.info("Initialize bulk doc loader {}".format(threading.current_thread()))
conn = Elasticsearch(settings.ELASTICSEARCH, timeout=180)
dequeue_results(
streaming_bulk(
conn,
load_docs_from_queue(file_queue, incomplete_files, doc_queue),
max_retries=2,
initial_backoff=10,
chunk_size=chunk_size,
yield_ok=False,
raise_on_exception=True,
raise_on_error=True,
)
)
logger.info("Shutting down doc loader {}".format(threading.current_thread()))

偶尔在bulk_load_from_queue中会发生这样的错误,我将其解释为块太大。

TransportError(429, 'circuit_breaking_exception', '[parent] Data too large, data for [<http_request>] would be [1024404322/976.9mb], which is larger than the limit of [1011774259/964.9mb], real usage: [1013836880/966.8mb], new bytes reserved: [10567442/10mb], usages [request=32880/32.1kb, fielddata=7440/7.2kb, in_flight_requests=164031664/156.4mb, accounting=46679308/44.5mb]')
重新运行通常可以解决这个问题,但是错误变得太频繁了。因此,我希望在load_files_to_queue中强制执行块大小限制,如下所示:
for transformed in transform_single_doc(current_type, row, index):
if transformed:
chunk_size = chunk_size + sys.getsizeof(transformed)
chunk.append(transformed)
# NOTE: Send messages in chunks in stead of single rows so that queue
# has less frequent locking
if (
chunk_size >= DOC_QUEUE_CHUNK_SIZE
or len(chunk) >= DOC_QUEUE_CHUNK_LEN
):
doc_queue.put(chunk)
chunk = []
chunk_size = 0
if len(chunk) > 0:
doc_queue.put(chunk)

这会导致处理结束时出现一些这样的错误:

ConnectionResetError
[Errno 104] Connection reset by peer

然后:

EOFError multiprocessing.connection in _recv

这基本上意味着你对Elasticsearch的请求太大了,它无法处理,所以你可以尝试减少块大小

或者,看看使用_bulk api,在python客户端中有一些帮助程序,它们应该为这个

省去大部分的麻烦。

我们在QA环境中再次遇到这种情况@brenden。您是否建议进一步减小块大小?当前被传递为200.

for worker in range(doc_worker_count):
job = doc_pool.apply_async(
bulk_load_from_queue,
args=(file_queue, incomplete_files, doc_queue, 200),
error_callback=error_callback,
)
jobs.append(job)

最新更新