对谷歌云功能中的大量任务进行排队



我尝试使用云函数通过每天调用一次外部API来更新数据。

到目前为止,我有:

  • 云调度设置为调用功能1

  • 功能1-循环项目并为每个项目创建一个任务

  • 任务-用功能1 提供的数据调用功能2

  • 函数2-调用外部API获取数据并更新我们的数据库

问题是每天有大约2k个项目需要更新,而云功能在更新之前就超时了,所以我把它们放在队列中。但是,即使将项目放在队列中,云功能也需要很长时间,因此在添加所有项目之前就已经超时了。

有没有一种简单的方法可以同时向队列批量添加多个任务?

如果做不到这一点,还有更好的解决方案吗?

全部用python 编写

功能1代码:

def refresh(request):
for i in items:
# Create a client.
client = tasks_v2.CloudTasksClient()
# TODO(developer): Uncomment these lines and replace with your values.
project = 'my-project'
queue = 'refresh-queue'
location = 'europe-west2'
name = i['name'].replace(' ','')
url = f"https://europe-west2-my-project.cloudfunctions.net/endpoint?name={name}"
# Construct the fully qualified queue name.
parent = client.queue_path(project, location, queue)
# Construct the request body.
task = {
"http_request": {  # Specify the type of request.
"http_method": tasks_v2.HttpMethod.GET,
"url": url,  # The full url path that the task will be sent to.
}
}

# Use the client to build and send the task.
response = client.create_task(request={"parent": parent, "task": task})

根据公开文档,回答您的问题"有没有一种简单的方法可以同时向队列批量添加多个任务?"最佳方法是实现双注入模式。

为此,您将有一个新队列,您将在其中添加一个包含原始队列的多个任务的单个任务,然后在该队列的接收端,您将拥有一个服务,该服务将获取该任务的数据,并在第二个队列上为每个条目创建一个任务。

此外,我建议您对冷队列使用500/50/5模式。这将有助于任务队列和云功能服务提高安全比率。

Chris32的答案是正确的,但我在代码片段中注意到的一件事是,您应该在for循环之外创建客户端。

def refresh(request):
# Create a client.
client = tasks_v2.CloudTasksClient()
# TODO(developer): Uncomment these lines and replace with your values.
project = 'my-project'
queue = 'refresh-queue'
location = 'europe-west2'
for i in items:
name = i['name'].replace(' ','')
url = f"https://europe-west2-my-project.cloudfunctions.net/endpoint?name={name}"
# Construct the fully qualified queue name.
parent = client.queue_path(project, location, queue)
# Construct the request body.
task = {
"http_request": {  # Specify the type of request.
"http_method": tasks_v2.HttpMethod.GET,
"url": url,  # The full url path that the task will be sent to.
}
}

# Use the client to build and send the task.
response = client.create_task(request={"parent": parent, "task": task})

在应用程序引擎中,我会在def refresh之外的文件级别执行client = tasks_v2.CloudTasksClient(),但我不知道这对云功能是否重要。

第二件事,

修改";函数2";取多个"名称",而不是只取一个。然后在";函数1";你可以发送10个名字到";函数2";一次

BATCH_SIZE = 10  # send 10 names to Function 2
def refresh(request):
# Create a client.
client = tasks_v2.CloudTasksClient()
# ...
for i in range(0, len(items), BATCH_SIZE)]:
items_batch = items[i:i + BATCH_SIZE]
names = ','.join([i['name'].replace(' ','') for i in items_batch])
url = f"https://europe-west2-my-project.cloudfunctions.net/endpoint?names={names}"
# Construct the fully qualified queue name.
# ...

如果这两个快速修复程序不起作用,那么你将不得不拆分";函数1";进入功能1A";以及";功能1B";

功能1A:

BATCH_SIZE = 100  # send 100 names to Function 1B
def refresh(request):
client = tasks_v2.CloudTasksClient()
for i in range(0, len(items), BATCH_SIZE)]:
items_batch = items[i:i + BATCH_SIZE]
names = ','.join([i['name'].replace(' ','') for i in items_batch])
url = f"https://europe-west2-my-project.cloudfunctions.net/endpoint-for-function-1b?names={names}"
# send the task.
response = client.create_task(request={
"parent": client.queue_path('my-project', 'europe-west2', 'refresh-queue'), 
"task": {
"http_request": {"http_method": tasks_v2.HttpMethod.GET, "url": url}
}})

功能1B:

BATCH_SIZE = 10  # send 10 names to Function 2
def refresh(request):
# set `names` equal to the query param `names`
client = tasks_v2.CloudTasksClient()
for i in range(0, len(names), BATCH_SIZE)]:
names_batch = ','.join(names[i:i + BATCH_SIZE])
url = f"https://europe-west2-my-project.cloudfunctions.net/endpoint-for-function-2?names={names_batch}"
# send the task.
response = client.create_task(request={
"parent": client.queue_path('my-project', 'europe-west2', 'refresh-queue'), 
"task": {
"http_request": {"http_method": tasks_v2.HttpMethod.GET, "url": url}
}})

最新更新