Firestore回复整个文档变更集/历史



我有两个云函数,当用户通过我的前端web应用程序进行身份验证时,它在用户对象上设置一个标志,这会触发第一个云函数执行:

exports.observeUserUpdate = functions.firestore
.document("/data/{uid}")
.onUpdate(async (snapshot) => {
const user = snapshot.after.data() as User;
if (user.scanRepositories) {
await pubSubClient
.topic("user-repositories-update")
.publish(Buffer.from(JSON.stringify(user)));
}
});

这个函数将把一个主题发送到一个队列,在这个队列中,我有一个订阅者,订阅者将接收该消息并执行一些后端操作,修改用户下嵌套的文档。

我有第二个云函数,它侦听用户文档下的更改,但只对我感兴趣的更改起作用:

exports.observeRepositoryUpdate = functions.firestore
.document("/data/{uid}/repositories/{repositoryId}")
.onUpdate(async (snapshot) => {
const repository = snapshot.after.data() as Repository;
if (repository.task == null || repository.task.length == 0) {
return;
}
if (repository.task.some((task: Task) => task.run)) {
await pubSubClient
.topic("repository-update")
.publish(Buffer.from(JSON.stringify(repository)));
}
});

当文档更改并满足特定条件时,它将触发另一个主题,供不同的后端系统处理。

这都是非常简单和有意义的,事情出错的地方是用户注销,当他们重新登录,并设置一个标志,正确的第一个函数触发和后端做它应该做的,然而,作为第二个函数的一部分发送的所有消息,都被"重播";给那个主题的消费者。

例如:用户登录,为repositories创建tasks。这些任务被正确执行,一切都在正常工作。然后用户注销,然后重新登录,这些相同的任务被第二个函数再次触发,当它们不应该被触发时。

看起来第二个函数正在触发整个变更集/历史。

是我的数据结构错了,还是我的文档路径不正确?希望得到一些帮助和道歉,这是非常冗长的。

edit:为了帮助说明,这里是服务器日志中发生的事情的顺序

// User logs in, login task fires as expected
2021-10-28 12:12:27.489  INFO 60338 --- [pool-1-thread-2] s.f.g.listeners.consumer.UserConsumer    : User consumer triggered: User(uid=g1OQ0K1...)
2021-10-28 12:12:27.492  INFO 60338 --- [pool-1-thread-2] s.f.github.services.GitHubServiceImpl    : Finding repositories
2021-10-28 12:12:31.954  INFO 60338 --- [pool-1-thread-2] s.f.g.r.FirestoreRepositoryImpl          : Batch write for 131 repositories
2021-10-28 12:13:05.827  INFO 60338 --- [pool-1-thread-2] s.f.g.r.FirestoreRepositoryImpl          : Batch write for 9 languages
2021-10-28 12:13:05.829  INFO 60338 --- [pool-1-thread-2] s.f.g.r.FirestoreRepositoryImpl          : Batch write for 5 types
// Login task complete 
// User runs task on repository
2021-10-28 12:13:50.392  INFO 60338 --- [pool-1-thread-2] s.f.g.l.consumer.RepositoryConsumer      : Repository consumer triggered: RepositoryDto(id=39931...)
2021-10-28 12:13:51.237  INFO 60338 --- [pool-1-thread-2] s.f.g.s.PackageManagerServiceImpl        : Detecting package manage for google-photo-frame with language Java
2021-10-28 12:13:52.319  INFO 60338 --- [pool-1-thread-2] s.f.g.r.FirestoreRepositoryImpl          : Batch write for 1 package managers
// Task completes
// User logs out and back in, the RepositoryConsumer is triggered again when it shouldnt
2021-10-28 12:15:09.917  INFO 60338 --- [pool-1-thread-5] s.f.g.l.consumer.RepositoryConsumer      : Repository consumer triggered: RepositoryDto(id=39931...)
2021-10-28 12:15:10.807  INFO 60338 --- [pool-1-thread-5] s.f.g.s.PackageManagerServiceImpl        : Detecting package manage for google-photo-frame with language Java
2021-10-28 12:15:11.872  INFO 60338 --- [pool-1-thread-5] s.f.g.r.FirestoreRepositoryImpl          : Batch write for 1 repositories
2021-10-28 12:15:12.262  INFO 60338 --- [pool-1-thread-5] s.f.g.listeners.consumer.UserConsumer    : User consumer triggered: User(uid=g1OQ0K1...)
2021-10-28 12:15:12.262  INFO 60338 --- [pool-1-thread-5] s.f.github.services.GitHubServiceImpl    : Finding repositories
2021-10-28 12:12:31.954  INFO 60338 --- [pool-1-thread-2] s.f.g.r.FirestoreRepositoryImpl          : Batch write for 131 repositories
2021-10-28 12:13:05.827  INFO 60338 --- [pool-1-thread-2] s.f.g.r.FirestoreRepositoryImpl          : Batch write for 9 languages
2021-10-28 12:13:05.829  INFO 60338 --- [pool-1-thread-2] s.f.g.r.FirestoreRepositoryImpl          : Batch write for 5 types

如文档中所述:

默认情况下,订阅在消息被确认后立即丢弃消息。未确认的消息默认保留7天(可通过订阅的message_retention_duration属性配置)。

因此,如果消息已被第二个函数确认,则消息将被重放似乎很奇怪,但它也声明:

配置订阅以保留已确认的消息(通过retain_acked_messages属性)允许您重播发送到订阅的先前打包的消息。订阅中的消息最多可保留7天,无论已确认还是未确认。也就是说,订阅中最老的消息的年龄不会超过7天。

这可以解释当用户注销和再次登录时,任务再次被触发的行为。

在这个GitHub的repo中,它的行为取决于retain_acked_messages标志:

# Whether to retain acknowledged messages. If true, acknowledged messages
# will not be expunged until they fall out of the RetentionDuration window.
subscription.retain_acked_messages: false

同样在寻求快照部分中声明:

一旦创建快照,它将保留:

  • 在创建快照时源订阅中未确认的所有消息。
  • 此后发布到主题的所有消息。

您可以通过使用快照查找任何主题的订阅来重播这些未确认的消息。

为了避免这种行为,我建议配置最适合您的retain_acked_messages标志,并使用过滤器:

您可以使用过滤器重播订阅中的消息。如果您使用带有过滤器的订阅来查找时间戳,那么Pub/Sub服务只会重新发送与过滤器匹配的消息。带有过滤器的订阅快照包含以下消息:

  • 所有比快照更新的消息,包括不匹配过滤器的消息。
  • 未确认的消息比快照早。

相关内容

  • 没有找到相关文章

最新更新