我从云函数向pubsub发送消息,并使用PubsubIO从全局窗口在Java中接收它们。有时,从 Java 拉取时,很少有从云函数传递的消息没有显示(在我的情况下,200 条中有 4 条被遗漏了)。
我的云功能:
const PubSub = require('@google-cloud/pubsub');
const pubsub = PubSub();
exports.messagePublisher= function(event,callback) {
const file = event.data;
function publishMessage(){
console.log('Publishing message to Scheduler: '+file.name);
//Get Topic
const topic = pubsub.topic('projects/analytics-and-presentation/topics/newTestTopic');
var publisher = topic.publisher();
var message = "sampleText";
// Publishes a message
publisher.publish(new Buffer.from(message), (err) => {
if (err) {
console.log('Error occurred',err); publishMessge();
} else {
console.log('Message published');
}
});
}
if (file.resourceState === 'exists' && file.name) {
publishMessage();
}
callback();
};
我应该怎么做才能确保没有任何消息被丢弃?
消息很可能被丢弃,因为它们被声明为延迟数据。请参阅编程指南中的窗口化和触发部分,特别是水印和延迟数据、处理延迟数据和默认触发器(我强烈建议从头到尾完整地阅读这些部分,而不仅仅是我链接的子部分)
您可以将窗口/触发策略配置为更宽容地处理哪些数据算作延迟数据,或者根本不删除延迟数据。
您可能还需要配置 PubsubIO 分配事件时间时间戳的方式。参见 javadoc。