在 AWS Lambda 函数中使用 Python 多处理队列



我有一些python可以创建多个进程以更快地完成任务。当我创建这些进程时,我会传入一个队列。在进程内部,我使用 queue.put(data(,因此我能够检索进程外部的数据。它在我的本地机器上运行得很好,但是当我将 zip 上传到 AWS Lambda 函数 (Python 3.8( 时,它说 Queue(( 函数尚未实现。当我简单地取出队列功能时,该项目在 AWS Lambda 中运行良好,因此我知道这是我目前唯一的挂断。

我确保通过使用"pip install multiprocess -t ./"以及"pip install boto3 -t ./"将多处理包直接安装到我的python项目中。

我是python和AWS的新手,但我最近遇到的研究可能将我们指向SQS。

阅读这些 SQS 文档时,我不确定这是否正是我正在寻找的。

这是我在 Lambda 中运行的代码,它在本地运行,但在 AWS 上不起作用。有关重要部分,请参阅*:

from multiprocessing import Process, Queue
from craigslist import CraigslistForSale
import time
import math
sitesHold = ["sfbay", "seattle", "newyork", "(many more)..." ]
results = []

def f(sites, category, search_keys, queue):
local_results = []
for site in sites:
cl_fs = CraigslistForSale(site=site, category=category, filters={'query': search_keys})
for result in cl_fs.get_results(sort_by='newest'):
local_results.append(result)
if len(local_results) > 0:
print(local_results)
queue.put(local_results) # Putting data *********************************

def scan_handler(event, context):
started_at = time.monotonic()
queue = Queue()
print("Running...")
amount_of_lists = int(event['amountOfLists'])
list_length = int(len(sitesHold) / amount_of_lists)
extra_lists = math.ceil((len(sitesHold) - (amount_of_lists * list_length)) / list_length)
site_list = []
list_creator_counter = 0
site_counter = 0
for i in range(amount_of_lists + extra_lists):
site_list.append(sitesHold[list_creator_counter:list_creator_counter + list_length])
list_creator_counter += list_length
processes = []
for i in range(len(site_list)):
site_counter = site_counter + len(site_list[i])
processes.append(Process(target=f, args=(site_list[i], event['category'], event['searchQuery'], queue,))) # Creating processes and creating queues ***************************
for process in processes:
process.start() # Starting processes ***********************
for process in processes:
listings = queue.get() # Getting from queue ****************************
if len(listings) > 0:
for listing in listings:
results.append(listing)
print(f"Results: {results}")
for process in processes:
process.join()
total_time_took = time.monotonic() - started_at
print(f"Sites processed: {site_counter}")
print(f'Took {total_time_took} seconds long')

这是 Lambda 函数给我的错误:

{
"errorMessage": "[Errno 38] Function not implemented",
"errorType": "OSError",
"stackTrace": [
"  File "/var/task/main.py", line 90, in scan_handlern    queue = Queue()n",
"  File "/var/lang/lib/python3.8/multiprocessing/context.py", line 103, in Queuen    return Queue(maxsize, ctx=self.get_context())n",
"  File "/var/lang/lib/python3.8/multiprocessing/queues.py", line 42, in __init__n    self._rlock = ctx.Lock()n",
"  File "/var/lang/lib/python3.8/multiprocessing/context.py", line 68, in Lockn    return Lock(ctx=self.get_context())n",
"  File "/var/lang/lib/python3.8/multiprocessing/synchronize.py", line 162, in __init__n    SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)n",
"  File "/var/lang/lib/python3.8/multiprocessing/synchronize.py", line 57, in __init__n    sl = self._semlock = _multiprocessing.SemLock(n"
]
}

Queue(( 在 AWS Lambda 中工作吗?实现目标的最佳方式是什么?

看起来不受支持 -

https://blog.ruanbekker.com/blog/2019/02/19/parallel-processing-on-aws-lambda-with-python-using-multiprocessing/

来自 AWS 文档

如果你用Python开发一个Lambda函数,并行性就不会到来。 默认情况下。Lambda支持Python 2.7和Python 3.6,两者都支持 具有多处理和线程模块。

Python 附带的多处理模块允许您运行 多个进程并行。由于 Lambda 执行 没有/dev/shm(进程共享内存(支持的环境, 不能使用多处理。队列或多处理。池。

另一方面,您可以使用多处理。管道代替 多处理。排队完成您需要的内容,而无需获得任何内容 执行 Lambda 函数期间出错。

相关内容

  • 没有找到相关文章

最新更新