关注MongoDB更改流



我们希望我们的 Go 应用程序侦听集合上的数据更改。因此,在谷歌搜索解决方案时,我们遇到了MongoDB的Change Streams。该链接还展示了一些语言的实现片段,如Python,Java,Nodejs等。然而,Go 没有一段代码。

我们使用 Mgo 作为驱动程序,但找不到有关更改流的显式语句。

有没有人知道如何使用该 Mgo 或任何其他 Go 的 Mongo 驱动程序在 Change Streams 上观看?

由古斯塔沃·尼迈耶开发的流行的mgo驱动程序(github.com/go-mgo/mgo(已经变暗(未维护(。而且它不支持更改流。

社区支持的分叉github.com/globalsign/mgo状况要好得多,并且已经添加了对变更流的支持(请参阅此处的详细信息(。

要监视集合的变化,只需使用 Collection.Watch() 方法,该方法返回值 mgo.ChangeStream 。下面是使用它的简单示例:

coll := ... // Obtain collection
pipeline := []bson.M{}
changeStream := coll.Watch(pipeline, mgo.ChangeStreamOptions{})
var changeDoc bson.M
for changeStream.Next(&changeDoc) {
    fmt.Printf("Change: %vn", changeDoc)
}
if err := changeStream.Close(); err != nil {
    return err
}

另请注意,有一个官方的MongoDB Go驱动程序正在开发中,它在这里宣布:考虑引入官方MongoDB Go驱动程序的社区影响

它目前处于 alpha (!!( 阶段,因此请考虑这一点。可在此处获得: github.com/mongodb/mongo-go-driver .它还已经支持更改流,类似地通过 Collection.Watch() 方法(这是一种不同的mongo.Collection类型,与mgo.Collection无关(。它返回一个mongo.Cursor,您可以像这样使用它:

var coll mongo.Collection = ... // Obtain collection
ctx := context.Background()
var pipeline interface{} // set up pipeline
cur, err := coll.Watch(ctx, pipeline)
if err != nil {
    // Handle err
    return
}
defer cur.Close(ctx)
for cur.Next(ctx) {
    elem := bson.NewDocument()
    if err := cur.Decode(elem); err != nil {
        log.Fatal(err)
    }
    // do something with elem....
}
if err := cur.Err(); err != nil {
    log.Fatal(err)
}

此示例使用 MongoDB 支持的带有流管道的 Go 驱动程序(仅过滤具有 field1=1 和 field2=false 的文档(:

    ctx := context.TODO()
    clientOptions := options.Client().ApplyURI(mongoURI)
    client, err := mongo.Connect(ctx, clientOptions)
    if err != nil {
        log.Fatal(err)
    }
    err = client.Ping(ctx, nil)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println("Connected!")
    collection := client.Database("test").Collection("test")
    pipeline := mongo.Pipeline{bson.D{
        {"$match",
            bson.D{
                {"fullDocument.field1", 1},
                {"fullDocument.field2", false},
            },
        },
    }}
    streamOptions := options.ChangeStream().SetFullDocument(options.UpdateLookup)
    stream, err := collection.Watch(ctx, pipeline, streamOptions)
    if err != nil {
        log.Fatal(err)
    }
    log.Print("waiting for changes")
    var changeDoc map[string]interface{}
    for stream.Next(ctx) {
        if e := stream.Decode(&changeDoc); e != nil {
            log.Printf("error decoding: %s", e)
        }
        log.Printf("change: %+v", changeDoc)
    }

相关内容

  • 没有找到相关文章

最新更新