AWS Lambda函数如何获得并行函数的结果



在我的AWS Lambda函数(Python 3.8运行时(中,我试图运行三个不同的不相关函数。这三个函数中的每一个都使用不同的数据类型返回不同的结果。我需要返回与主函数lambda_handler并行工作的这三个函数的结果。你认为最有效的方法是什么?

现在我正在考虑使用一个全局变量,在其中我将记录三个函数的结果。但在我看来,我无法在主函数lambda_handler中获得这些结果,因为它们在不同的过程中工作。不是吗?

我认为使用Pipe会更合适。我发现AWS Lambda不支持Queue。你不这么认为吗?

from multiprocessing import Process

results = dict()
def first_function(event):
# Do something
global results
results["first_function"] = True
def second_function():
# Do something
global results
results["second_function"] = 30

def third_function():
# Do something
global results
results["third_function"] = ["q", "w", "e", "r", "t"]

def execute_parallel_processes(*functions):
# Create an empty list to keep all parallel processes.
processes = list()
# Create a process per function.
for function in functions:
process = Process(target=function)
processes.append(process)
# Start all parallel processes.
for process in processes:
process.start()
# Wait until all parallel processes are finished.
for process in processes:
process.join()

def lambda_handler(event, context):
# Execute 3 different processes in parallel.
execute_parallel_processes(
first_function(event),
second_function,
third_function
)
print(results)
return None

我用Pipe解决了这个问题。下面是一个工作代码片段:

import json
from multiprocessing import Process, Pipe
from typing import *
from functools import wraps
logger = logging.getLogger(__name__)
logger.setLevel(logging.ERROR)

def first_function(*args, **kwargs):
print("first_function")
pipe = kwargs["pipe"]
pipe.send({"first_function": True})
pipe.close()

def second_function(*args, **kwargs):
print("second_function")
pipe = kwargs["pipe"]
pipe.send({"second_function": 30})
pipe.close()

def third_function(*args, **kwargs):
print("third_function")
pipe = kwargs["pipe"]
pipe.send({"third_function": ["item1", "item2"]})
pipe.close()

def execute_parallel_processes(functions):
# Create an empty list to keep all parallel processes.
processes = []
# Create an empty list of pipes to keep all connections.
pipes = []
# Create a process per function.
for index, function in enumerate(functions):
# Check whether the input arguments have keys in their dictionaries.
try:
target = function["object"]
except KeyError as error:
logger.error(error)
raise Exception(error)
try:
kwargs = function["arguments"]
except KeyError as error:
logger.error(error)
raise Exception(error)
# Create the pipe for communication.
parent_pipe, child_pipe = Pipe()
pipes.append(parent_pipe)
kwargs["pipe"] = child_pipe
# Create the process.
process = Process(target=target, kwargs=kwargs)
processes.append(process)
# Start all parallel processes.
for process in processes:
process.start()
# Wait until all parallel processes are finished.
for process in processes:
process.join()
# Get the results of all the processes.
results = {}
for index, pipe in enumerate(pipes):
results = {**results, **pipe.recv()}
# Return the results of all processes.
return results

def lambda_handler(event, context):
functions = [
{
"object": first_function,
"arguments": {}
},
{
"object": second_function,
"arguments": {"event": event}
},
{
"object": third_function,
"arguments": {}
}
]
results = execute_parallel_processes(functions=functions)
print(results)
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}

最新更新