如何防止来自不同实例的请求覆盖数据库(使用NDB的谷歌应用程序引擎)



我的谷歌应用引擎应用程序(Python3,标准环境(为用户提供请求:如果数据库中没有想要的记录,则创建它。

以下是关于数据库覆盖的问题:

当一个用户(通过浏览器(向数据库发送请求时,正在运行的GAE实例可能会暂时无法响应该请求,然后创建一个新的进程来响应该请求。结果是两个实例对同一请求作出响应。两个实例几乎同时对数据库进行查询,每个实例都发现没有所需的记录,从而创建一个新记录。结果是两次重复记录。

另一种情况是,由于某些原因,用户的浏览器发送了两次时间差小于0.01秒的请求,由服务器端的两个实例处理,从而创建了重复的记录。

我想知道如何通过一个实例临时锁定数据库,以防止另一个实例覆盖数据库。

我考虑过以下方案,但不知道它是否有效。

  1. 对于python 2,谷歌应用引擎提供了"memcache";,其可用于标记查询的状态,以实现数据库锁定的目的。但对于python3,似乎必须设置Redis服务器才能在不同实例之间快速交换数据库状态。那么,使用Redis锁定数据库的效率如何呢?

  2. Flask会话模块的用法。会话模块可以用于在不同的请求之间共享数据(在大多数情况下,是用户的登录状态(,从而共享不同的实例。我想知道在不同实例之间交换数据的速度。


附加信息(1(

我遵循了使用transaction的建议,但它不起作用。下面是我用来验证交易的代码。

失败的原因可能是事务仅适用于CURRENT客户端。对于同时有多个请求,GAE的服务器端将创建不同的进程或实例来响应请求,并且每个进程或实例都有自己的独立客户端。

@staticmethod
def get_test(test_key_id, unique_user_id, course_key_id, make_new=False):
client = ndb.Client()
with client.context():
from google.cloud import datastore
from datetime import datetime
client2 = datastore.Client()
print("transaction started at: ", datetime.utcnow())
with client2.transaction():
print("query started at: ", datetime.utcnow())
my_test = MyTest.query(MyTest.test_key_id==test_key_id, MyTest.unique_user_id==unique_user_id).get()
import time
time.sleep(5)
if make_new and not my_test:
print("data to create started at: ", datetime.utcnow())
my_test = MyTest(test_key_id=test_key_id, unique_user_id=unique_user_id, course_key_id=course_key_id, status="")
my_test.put()
print("data to created at: ", datetime.utcnow())
print("transaction ended at: ", datetime.utcnow())
return my_test

附加信息(2(

以下是有关memcache(Python3(用法的新信息我尝试过以下代码使用memcache锁定数据库,但仍然无法避免覆盖。

@user_student.route("/run_test/<test_key_id>/<user_key_id>/")
def run_test(test_key_id, user_key_id=0):
from google.appengine.api import memcache
import time
cache_key_id = test_key_id+"_"+user_key_id
print("cache_key_id", cache_key_id)
counter = 0
client = memcache.Client()
while True:  # Retry loop
result = client.gets(cache_key_id)
if result is None or result == "":
client.cas(cache_key_id, "LOCKED")
print("memcache added new value: counter = ", counter)
break
time.sleep(0.01)
counter+=1
if counter>500:
print("failed after 500 tries.")
break
my_test = MyTest.get_test(int(test_key_id), current_user.unique_user_id, current_user.course_key_id, make_new=True)
client.cas(cache_key_id, "")
memcache.delete(cache_key_id)

如果问题是重复而不是覆盖,也许您应该在创建新条目时指定数据id,但不要让GAE为您生成随机id。然后,应用程序将向同一条目写入两次,而不是创建两个条目。数据id可以是任何唯一的,例如会话id、时间戳等。

事务的问题是,它阻止您并行修改同一个条目,但不会阻止您并行创建两个新条目。

我以以下方式使用memcache(使用get/set(,并成功地锁定了数据库写入。

gets/cas似乎不太好用。在一次测试中,我通过cas((设置了valve,但后来它无法通过gets((读取值。

内存API:https://cloud.google.com/appengine/docs/standard/python3/reference/services/bundled/google/appengine/api/memcache

@user_student.route("/run_test/<test_key_id>/<user_key_id>/")
def run_test(test_key_id, user_key_id=0):
from google.appengine.api import memcache
import time
cache_key_id = test_key_id+"_"+user_key_id
print("cache_key_id", cache_key_id)
counter = 0
client = memcache.Client()
while True:  # Retry loop
result = client.get(cache_key_id)
if result is None or result == "":
client.set(cache_key_id, "LOCKED")
print("memcache added new value: counter = ", counter)
break
time.sleep(0.01)
counter+=1
if counter>500:
return "failed after 500 tries of memcache checking."
my_test = MyTest.get_test(int(test_key_id), current_user.unique_user_id, current_user.course_key_id, make_new=True)
client.delete(cache_key_id)
...

事务:https://developers.google.com/appengine/docs/python/datastore/transactions

当两个或更多事务同时尝试修改一个或多个公共实体组中的实体时,只有第一个提交其更改的事务才能成功;所有其他人都会失败。

您应该在事务中更新您的值。只要您的读写在一个事务中,应用程序引擎的事务将防止两个更新相互覆盖。一定要注意关于实体组的讨论。

你有两个选择:

  • 实现您自己的事务失败逻辑(重试等(
  • 与其直接写入数据存储,不如创建一个任务来修改一个实体。在任务中运行事务。如果失败,应用程序引擎将重试此任务,直到成功为止

最新更新