Python如何观察Mongodb的Oplog的变化



我有多个Python脚本使用pyMongo写入Mongodb。另一个 Python 脚本如何观察对 Mongo 查询的更改并在更改发生时执行某些功能?MongoDB在启用Oplog的情况下设置。

段时间我用Python为MongoDB编写了一个增量备份工具。该工具通过跟踪oplog来监视数据更改。这是代码的相关部分。

更新的答案,MongDB 3.6+

正如datdinhquoc在下面的评论中巧妙地指出的那样,对于MongoDB 3.6及更高版本,有Change Streams。

更新的答案,pymongo 3

from time import sleep
from pymongo import MongoClient, ASCENDING
from pymongo.cursor import CursorType
from pymongo.errors import AutoReconnect
# Time to wait for data or connection.
_SLEEP = 1.0
if __name__ == '__main__':
    oplog = MongoClient().local.oplog.rs
    stamp = oplog.find().sort('$natural', ASCENDING).limit(-1).next()['ts']
    while True:
        kw = {}
        kw['filter'] = {'ts': {'$gt': stamp}}
        kw['cursor_type'] = CursorType.TAILABLE_AWAIT
        kw['oplog_replay'] = True
        cursor = oplog.find(**kw)
        try:
            while cursor.alive:
                for doc in cursor:
                    stamp = doc['ts']
                    print(doc)  # Do something with doc.
                sleep(_SLEEP)
        except AutoReconnect:
            sleep(_SLEEP)

另请参阅 http://api.mongodb.com/python/current/examples/tailable.html。

原答案,皮蒙戈2

from time import sleep
from pymongo import MongoClient
from pymongo.cursor import _QUERY_OPTIONS
from pymongo.errors import AutoReconnect
from bson.timestamp import Timestamp
# Tailable cursor options.
_TAIL_OPTS = {'tailable': True, 'await_data': True}
# Time to wait for data or connection.
_SLEEP = 10
if __name__ == '__main__':
    db = MongoClient().local
    while True:
        query = {'ts': {'$gt': Timestamp(some_timestamp, 0)}}  # Replace with your query.
        cursor = db.oplog.rs.find(query, **_TAIL_OPTS)
        cursor.add_option(_QUERY_OPTIONS['oplog_replay'])
        try:
            while cursor.alive:
                try:
                    doc = next(cursor)
                    # Do something with doc.
                except (AutoReconnect, StopIteration):
                    sleep(_SLEEP)
        finally:
            cursor.close()

我今天遇到了这个问题,但在任何地方都没有找到更新的答案。

Cursor 类已从 v3.0 开始更改,不再接受 tailableawait_data 参数。此示例将尾随操作日志,并在找到比找到的上一条记录更新的记录时打印操作日志记录。

# Adapted from the example here: https://jira.mongodb.org/browse/PYTHON-735
# to work with pymongo 3.0
import pymongo
from pymongo.cursor import CursorType
c = pymongo.MongoClient()
# Uncomment this for master/slave.
oplog = c.local.oplog['$main']
# Uncomment this for replica sets.
#oplog = c.local.oplog.rs
first = next(oplog.find().sort('$natural', pymongo.DESCENDING).limit(-1))
ts = first['ts']
while True:
    cursor = oplog.find({'ts': {'$gt': ts}}, cursor_type=CursorType.TAILABLE_AWAIT, oplog_replay=True)
    while cursor.alive:
        for doc in cursor:
            ts = doc['ts']
            print doc
            # Work with doc here

使用可尾游标查询操作日志。

这实际上很有趣,因为oplog监控正是最初添加可尾光标功能的目的。我发现它对其他事情也非常有用(例如,实现基于 mongodb 的 pubsub,例如请参阅这篇文章),但这是最初的目的。

我遇到了同样的问题。我把这个重新公社/反对意见放在一起.py。检查注释并查看__main__以获取有关如何将其与脚本一起使用的示例。

最新更新