谷歌云数据存储:统计处于特定状态的所有实体



背景

我需要向大约100万台设备发送大量通知,我正在使用谷歌云功能进行构建。

在当前设置中,我将每个设备令牌作为PubSub消息排队,该消息为:

  • 在DataStore中存储一个挂起的通知,用于跟踪重试和成功状态
  • 尝试发送通知
  • 如果重试次数足够并且尚未通过,则将通知标记为成功或失败

这或多或少都很好,我从中获得了不错的性能,每秒处理1.5K个代币。

问题

我想随时了解整个工作的进展情况。既然我知道我希望处理多少通知,我就能够报告处理了x/1_000_000之类的事情,然后当失败+成功的总和与我想要处理的一样多时,就认为已经完成了。

DataStore文档建议不要对实体本身运行计数,因为它不会具有性能,我可以确认这一点。我按照他们的分片计数器示例文档实现了一个计数器,我在最后包括了这个文档。

我看到的问题是,它既很慢,也很容易返回409 Contention errors,这使得我的函数调用重试,这并不理想,因为计数本身对进程来说并不重要,而且每个通知的重试预算有限。在实践中,失败最多的是增加进程结束时发生的计数器,这将增加通知读取的负载,以在重试时检查其状态,这意味着我最终得到的计数器小于实际成功的通知。

我使用wrk运行了一个快速基准测试,似乎通过增加计数器获得了大约400 RPS,平均延迟为250ms。与通知逻辑本身相比,这相当慢,通知逻辑本身每个通知执行大约3个DataStore查询,并且可能比递增计数器更复杂。当添加到争用错误中时,我最终得到了一个我认为不稳定的实现。我知道数据存储通常会随着持续的大量使用而自动扩展,但使用此服务的模式非常罕见,而且对于整批令牌来说,因此不会有任何以前的流量来扩展。

问题

  • 计数器实现是否缺少一些可以改进以降低速度的东西
  • 为了得到我想要的东西,我应该考虑另一种方法吗

代码

与数据存储交互的代码

DATASTORE_READ_BATCH_SIZE = 100
class Counter():
kind = "counter"
shards = 2000
@staticmethod
def _key(namespace, shard):
return hashlib.sha1(":".join([str(namespace), str(shard)]).encode('utf-8')).hexdigest()
@staticmethod
def count(namespace):
keys = []
total = 0
for shard in range(Counter.shards):
if len(keys) == DATASTORE_READ_BATCH_SIZE:
counters = client.get_multi(keys)
total = total + sum([int(c["count"]) for c in counters])
keys = []
keys.append(client.key(Counter.kind, Counter._key(namespace, shard)))
if len(keys) != 0:
counters = client.get_multi(keys)
total = total + sum([int(c["count"]) for c in counters])
return total
@staticmethod
def increment(namespace):
key = client.key(Counter.kind, Counter._key(namespace, random.randint(0, Counter.shards - 1)))
with client.transaction():
entity = client.get(key)
if entity is None:
entity = datastore.Entity(key=key)
entity.update({
"count": 0,
})
entity.update({
"count": entity["count"] + 1,
})
client.put(entity)

这是从谷歌云功能调用的,就像一样

from flask import abort, jsonify, make_response
from src.notify import FCM, APNS
from src.lib.datastore import Counter
def counter(request):
args = request.args
if args.get("platform"):
Counter.increment(args["platform"])
return
return jsonify({
FCM: Counter.count(FCM),
APNS: Counter.count(APNS)
})

这既用于递增和读取计数,也按iOS和Android平台划分。

最后我放弃了计数器,并开始在BigQuery中保存通知的状态。定价仍然合理,因为它仍然是按次使用的,流媒体版本的数据插入似乎足够快,在实践中不会给我带来任何问题。

有了这个,我可以使用一个简单的sql查询来计算与一个批处理作业匹配的所有实体。最终,所有实体都需要大约3秒的时间,与替代方案相比,这对我来说是可以接受的性能,因为这只是内部使用。

相关内容

  • 没有找到相关文章

最新更新