在集合 Mongo DB 中的每个文档上调用自定义 python 函数



我想对整个集合中每个文档的某个现有属性调用自定义python函数,并将结果作为新的键值对存储在该(相同)文档中。我可以知道是否有任何方法可以做到这一点(因为每个调用都独立于其他调用)?

我注意到cursor.forEach但不能只有效地使用 python 来完成吗?

一个简单的例子是将字符串拆分为text并将单词的编号存储为新属性。

def split_count(text):
    # some complex preprocessing...
    return len(text.split())
# Need something like this...
db.collection.update_many({}, {'$set': {"split": split_count('$text') }}, upsert=True)
但是,似乎无法以

这种方式根据同一文档中另一个属性的值在文档中设置新属性。这篇文章很旧,但问题似乎仍然悬而未决。

我找到了一种方法,可以使用 PyMongo 中的 parallel_scan 在集合上调用任何自定义 python 函数。

def process_text(cursor):
    for row in cursor.batch_size(200):
        # Any complex preprocessing here...
        split_text = row['text'].split()
        db.collection.update_one({'_id': row['_id']}, 
                                 {'$set': {'split_text': split_text, 
                                           'num_words': len(split_text) }},
                                 upsert=True)

def preprocess(num_threads=4):
    # Get up to max 'num_threads' cursors.
    cursors = db.collection.parallel_scan(num_threads)
    threads = [threading.Thread(target=process_text, args=(cursor,)) for cursor in cursors]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

这并不比cursor.forEach快(但也不是那么慢),但它可以帮助我执行任何任意复杂的 python 代码并从 Python 本身中保存结果。

此外,如果我在其中一个属性中有一个ints数组,执行cursor.forEach将它们转换为我不想要的floats。所以我更喜欢这种方式。

但我很高兴知道是否有比这更好的方法:)

在python中做这种事情不太可能是有效的。这是因为文档必须往返并遍历客户端计算机上的 python 函数。

在您的示例代码中,您将函数的结果传递给 mongodb update查询,这将不起作用。您不能在数据库服务器上的mongodb查询中运行任何python代码。

正如您链接问题的答案所暗示的那样,这种类型的操作必须在 mongo shell 中执行。 例如:

db.collection.find().snapshot().forEach(
    function (elem) {
        splitLength = elem.text.split(" ").length
        db.collection.update(
            {
                _id: elem._id
            },
            {
                $set: {
                    split: splitLength 
                }
            }
        );
    }
);  

最新更新