多处理机Dask Pymongo



我尝试使用dask并行查询MongoDB数据库,但多处理似乎不起作用。

我有以下延迟示例

@dask.delayed
def _retrieve_one(query, settings, new_chunck):
with MongoClient(settings.MONGODB, settings.MONGOPORT) as client:
db = client[settings["MONGO_DATABASE"]]
collection = db[settings["MONGO_COLLECTIONS"]]
new_query = dict()
new_query["_id"] = {"$in": new_chunck}
iterator = collection.find(new_query)
df_res = pd.concat([pd.DataFrame(x) for x in iterator], axis=0)
df_res = df_res.reset_index()
COL = ["ip", "host", "day", "http_method", "start_date"]
to_concatenate = df_res.loc[:, ["_id"] + COL].drop_duplicates()
df_res = df_res.drop(COL, axis=1)
df_res = df_res.pivot(index="_id", columns="index", values="values")
df_res = df_res.merge(to_concatenate, on="_id")
return(df_res)

我用这个代码启动

with MongoClient(self.settings.MONGODB, self.settings.MONGOPORT) as client:
db = client[self.settings["MONGO_DATABASE"]]
collection = db[self.settings["MONGO_COLLECTIONS"]]
# retrieve all id matching query - For multiprocessing
all_id =
list(collection.find(query)
.distinct('_id')
)
logging.info("{} documents to retrieve".format(len(all_id)))
id_chunck = [all_id[i:i+chunck_size] for i in range(0, len(all_id), chunck_size)]
dd_df = dd.from_delayed([_retrieve_one(query, self.settings, chunck) for chunck in id_chunck], 
meta=pd.DataFrame({"_id": ["a"], 
"bytes":["a"],
"bytes_in":["a"],
"bytes_out":["a"],
"n_queries":["a"],
"ip": ["a"], 
"host": ["a"], 
"day": [datetime.strptime("2020-01-01", '%Y-%m-%d')], 
"http_method": ["a"],
"start_date": [datetime.strptime("2020-01-01", "%Y-%m-%d")]
}))

它是有效的(没有错误(,但当我将chunk_size更改为具有多个分区时,速度不会更快,而且执行似乎只在一个核心上进行。

我建议分析您的计算,看看什么是慢的。

我的第一个猜测是你被网络或Mongo数据库本身绑定了,但我不知道。

有关使用Dask时了解性能的更多信息,请参阅以下文档:https://docs.dask.org/en/latest/understanding-performance.html

相关内容

  • 没有找到相关文章

最新更新