我有两个云函数,当用户通过我的前端web应用程序进行身份验证时,它在用户对象上设置一个标志,这会触发第一个云函数执行:
exports.observeUserUpdate = functions.firestore
.document("/data/{uid}")
.onUpdate(async (snapshot) => {
const user = snapshot.after.data() as User;
if (user.scanRepositories) {
await pubSubClient
.topic("user-repositories-update")
.publish(Buffer.from(JSON.stringify(user)));
}
});
这个函数将把一个主题发送到一个队列,在这个队列中,我有一个订阅者,订阅者将接收该消息并执行一些后端操作,修改用户下嵌套的文档。
我有第二个云函数,它侦听用户文档下的更改,但只对我感兴趣的更改起作用:
exports.observeRepositoryUpdate = functions.firestore
.document("/data/{uid}/repositories/{repositoryId}")
.onUpdate(async (snapshot) => {
const repository = snapshot.after.data() as Repository;
if (repository.task == null || repository.task.length == 0) {
return;
}
if (repository.task.some((task: Task) => task.run)) {
await pubSubClient
.topic("repository-update")
.publish(Buffer.from(JSON.stringify(repository)));
}
});
当文档更改并满足特定条件时,它将触发另一个主题,供不同的后端系统处理。
这都是非常简单和有意义的,事情出错的地方是用户注销,当他们重新登录,并设置一个标志,正确的第一个函数触发和后端做它应该做的,然而,作为第二个函数的一部分发送的所有消息,都被"重播";给那个主题的消费者。
例如:用户登录,为repositories
创建tasks
。这些任务被正确执行,一切都在正常工作。然后用户注销,然后重新登录,这些相同的任务被第二个函数再次触发,当它们不应该被触发时。
看起来第二个函数正在触发整个变更集/历史。
是我的数据结构错了,还是我的文档路径不正确?希望得到一些帮助和道歉,这是非常冗长的。
edit:为了帮助说明,这里是服务器日志中发生的事情的顺序
// User logs in, login task fires as expected
2021-10-28 12:12:27.489 INFO 60338 --- [pool-1-thread-2] s.f.g.listeners.consumer.UserConsumer : User consumer triggered: User(uid=g1OQ0K1...)
2021-10-28 12:12:27.492 INFO 60338 --- [pool-1-thread-2] s.f.github.services.GitHubServiceImpl : Finding repositories
2021-10-28 12:12:31.954 INFO 60338 --- [pool-1-thread-2] s.f.g.r.FirestoreRepositoryImpl : Batch write for 131 repositories
2021-10-28 12:13:05.827 INFO 60338 --- [pool-1-thread-2] s.f.g.r.FirestoreRepositoryImpl : Batch write for 9 languages
2021-10-28 12:13:05.829 INFO 60338 --- [pool-1-thread-2] s.f.g.r.FirestoreRepositoryImpl : Batch write for 5 types
// Login task complete
// User runs task on repository
2021-10-28 12:13:50.392 INFO 60338 --- [pool-1-thread-2] s.f.g.l.consumer.RepositoryConsumer : Repository consumer triggered: RepositoryDto(id=39931...)
2021-10-28 12:13:51.237 INFO 60338 --- [pool-1-thread-2] s.f.g.s.PackageManagerServiceImpl : Detecting package manage for google-photo-frame with language Java
2021-10-28 12:13:52.319 INFO 60338 --- [pool-1-thread-2] s.f.g.r.FirestoreRepositoryImpl : Batch write for 1 package managers
// Task completes
// User logs out and back in, the RepositoryConsumer is triggered again when it shouldnt
2021-10-28 12:15:09.917 INFO 60338 --- [pool-1-thread-5] s.f.g.l.consumer.RepositoryConsumer : Repository consumer triggered: RepositoryDto(id=39931...)
2021-10-28 12:15:10.807 INFO 60338 --- [pool-1-thread-5] s.f.g.s.PackageManagerServiceImpl : Detecting package manage for google-photo-frame with language Java
2021-10-28 12:15:11.872 INFO 60338 --- [pool-1-thread-5] s.f.g.r.FirestoreRepositoryImpl : Batch write for 1 repositories
2021-10-28 12:15:12.262 INFO 60338 --- [pool-1-thread-5] s.f.g.listeners.consumer.UserConsumer : User consumer triggered: User(uid=g1OQ0K1...)
2021-10-28 12:15:12.262 INFO 60338 --- [pool-1-thread-5] s.f.github.services.GitHubServiceImpl : Finding repositories
2021-10-28 12:12:31.954 INFO 60338 --- [pool-1-thread-2] s.f.g.r.FirestoreRepositoryImpl : Batch write for 131 repositories
2021-10-28 12:13:05.827 INFO 60338 --- [pool-1-thread-2] s.f.g.r.FirestoreRepositoryImpl : Batch write for 9 languages
2021-10-28 12:13:05.829 INFO 60338 --- [pool-1-thread-2] s.f.g.r.FirestoreRepositoryImpl : Batch write for 5 types
如文档中所述:
默认情况下,订阅在消息被确认后立即丢弃消息。未确认的消息默认保留7天(可通过订阅的message_retention_duration属性配置)。
因此,如果消息已被第二个函数确认,则消息将被重放似乎很奇怪,但它也声明:
配置订阅以保留已确认的消息(通过
retain_acked_messages
属性)允许您重播发送到订阅的先前打包的消息。订阅中的消息最多可保留7天,无论已确认还是未确认。也就是说,订阅中最老的消息的年龄不会超过7天。
这可以解释当用户注销和再次登录时,任务再次被触发的行为。
在这个GitHub的repo中,它的行为取决于retain_acked_messages
标志:
# Whether to retain acknowledged messages. If true, acknowledged messages
# will not be expunged until they fall out of the RetentionDuration window.
subscription.retain_acked_messages: false
同样在寻求快照部分中声明:
一旦创建快照,它将保留:
- 在创建快照时源订阅中未确认的所有消息。
- 此后发布到主题的所有消息。
您可以通过使用快照查找任何主题的订阅来重播这些未确认的消息。
为了避免这种行为,我建议配置最适合您的retain_acked_messages
标志,并使用过滤器:
您可以使用过滤器重播订阅中的消息。如果您使用带有过滤器的订阅来查找时间戳,那么Pub/Sub服务只会重新发送与过滤器匹配的消息。带有过滤器的订阅快照包含以下消息:
- 所有比快照更新的消息,包括不匹配过滤器的消息。
- 未确认的消息比快照早。