AsyncElasticsearch客户端是否为每个异步请求打开一个新会话
AsyncElasticsearch(来自elasticsearch py(使用AIOHTTP。据我所知,AIOHTTP建议对aiohttp.ClientSession
对象使用上下文管理器,这样就不会为每个请求生成新的会话:
async with aiohttp.ClientSession() as session:
...
我正在努力加快我的大量摄入。
- 如何知道AsyncElasticsearch客户端是在使用同一个会话,还是在设置多个会话
- 我需要在下面的代码片段中使用上面的
async with...
命令吗
# %%------------------------------------------------------------------------------------
# Create async elastic client
async_es = AsyncElasticsearch(
hosts=[os.getenv("ELASTIC_URL")],
verify_certs=False,
http_auth=(os.getenv("ELASTIC_USERNAME"), os.getenv("ELASTIC_PW")),
timeout=60 * 60,
ssl_show_warn=False,
)
# %%------------------------------------------------------------------------------------
# Upload csv to elastic
# Chunk files to keep memory low
with pd.read_csv(file, usecols=["attributes"], chunksize=50_000) as reader:
for df in reader:
# Upload to elastic with username as id
async def generate_actions(df_chunk):
for index, record in df_chunk.iterrows():
doc = record.replace({np.nan: None}).to_dict()
doc.update(
{"_id": doc["username"], "_index": "users",}
)
yield doc
es_upl_chunk = 1000
async def main():
tasks = []
for i in range(0, len(df), es_upl_chunk):
tasks.append(
helpers.async_bulk(
client=async_es,
actions=generate_actions(df[i : i + es_upl_chunk]),
chunk_size=es_upl_chunk,
)
)
successes = 0
errors = []
print("Uploading to es...")
progress = tqdm(unit=" docs", total=len(df))
for task in asyncio.as_completed(tasks):
resp = await task
successes += resp[0]
errors.extend(resp[1])
progress.update(es_upl_chunk)
return successes, errors
responses = asyncio.run(main())
print(f"Uploaded {responses[0]} documents from {file}")
if len(responses[1]) > 0:
print(
f"WARNING: Encountered the following errors: {','.join(responses[1])}"
)
事实证明,在这种情况下,AsyncElasticsearch不是加速批量摄取的合适客户端。我改为使用helper.parallel_bulk((函数。