问题
我正在编写一个GCP云函数,它从pubsub消息中获取输入id,处理并将表输出到BigQuery。
代码如下:
from __future__ import absolute_import
import base64
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from scrapinghub import ScrapinghubClient
import os
def processing_data_function():
# do stuff and return desired data
def create_data_from_id():
# take scrapinghub's job id and extract the data through api
def run(event, context):
"""Triggered from a message on a Cloud Pub/Sub topic.
Args:
event (dict): Event payload.
context (google.cloud.functions.Context): Metadata for the event.
"""
# Take pubsub message and also Scrapinghub job's input id
pubsub_message = base64.b64decode(event['data']).decode('utf-8')
agrv = ['--project=project-name',
'--region=us-central1',
'--runner=DataflowRunner',
'--temp_location=gs://temp/location/',
'--staging_location=gs://staging/location/']
p = beam.Pipeline(options=PipelineOptions(agrv))
(p
| 'Read from Scrapinghub' >> beam.Create(create_data_from_id(pubsub_message))
| 'Trim b string' >> beam.FlatMap(processing_data_function)
| 'Write Projects to BigQuery' >> beam.io.WriteToBigQuery(
'table_name',
schema=schema,
# Creates the table in BigQuery if it does not yet exist.
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
run()
请注意,两个函数create_data_from_id
和processing_data_function
处理来自Scratchhub(scratchy的一个抓取站点(的数据,它们非常长,所以我不想在这里包含它们。它们也与错误无关,因为如果我从云shell运行此代码并使用argparse.ArgumentParser()
传递参数,则此代码可以工作。
关于我遇到的错误,虽然部署代码没有问题,pubsub消息可以成功触发函数,但数据流作业失败并报告了此错误:
"Error message from worker: Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 279, in loads
return dill.loads(s)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 275, in loads
return load(file, ignore, **kwds)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 270, in load
return Unpickler(file, ignore=ignore, **kwds).load()
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load
obj = StockUnpickler.load(self)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 826, in _import_module
return __import__(import_name)
ModuleNotFoundError: No module named 'main'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 649, in do_work
work_executor.execute()
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute
op.start()
File "apache_beam/runners/worker/operations.py", line 662, in apache_beam.runners.worker.operations.DoOperation.start
File "apache_beam/runners/worker/operations.py", line 664, in apache_beam.runners.worker.operations.DoOperation.start
File "apache_beam/runners/worker/operations.py", line 665, in apache_beam.runners.worker.operations.DoOperation.start
File "apache_beam/runners/worker/operations.py", line 284, in apache_beam.runners.worker.operations.Operation.start
File "apache_beam/runners/worker/operations.py", line 290, in apache_beam.runners.worker.operations.Operation.start
File "apache_beam/runners/worker/operations.py", line 611, in apache_beam.runners.worker.operations.DoOperation.setup
File "apache_beam/runners/worker/operations.py", line 616, in apache_beam.runners.worker.operations.DoOperation.setup
File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 283, in loads
return dill.loads(s)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 275, in loads
return load(file, ignore, **kwds)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 270, in load
return Unpickler(file, ignore=ignore, **kwds).load()
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load
obj = StockUnpickler.load(self)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 826, in _import_module
return __import__(import_name)
ModuleNotFoundError: No module named 'main'
我尝试过的
考虑到我可以从cloudshell运行相同的管道,但使用参数解析器而不是指定选项,我认为选项的陈述方式是问题所在。因此,我尝试了不同的选项组合,它们是有或没有--save_main_session
、--staging_location
、--requirement_file=requirements.txt
、--setup_file=setup.py
。。。他们都或多或少地报告了相同的问题,都是因为dill不知道该选择哪个模块。在指定了save_main_session
的情况下,无法拾取主会话。指定了requirement_file和setup_file后,作业甚至没有成功创建,所以我可以省去您查看其错误的麻烦。我的主要问题是,我不知道这个问题是从哪里来的,因为我以前从未使用过dill,为什么从shell和从云函数运行管道如此不同?有人知道线索吗?
感谢
您也可以尝试将最后一部分修改为并测试以下是否有效:
if __name__ == "__main__":
...
此外,请确保在正确的文件夹中执行脚本,因为这可能与文件在目录中的命名或位置有关。
请考虑以下来源,您可能会发现这些来源很有帮助:来源1,来源2
我希望这些信息能有所帮助。
您可能正在使用gunicorn在Cloud Run上启动应用程序(作为标准做法(,如:
CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app
我也遇到了同样的问题,并找到了一个在没有gunicorn的情况下启动应用程序的变通方法:
CMD exec python3 main.py
可能是因为gunicorn跳过main上下文,直接启动main:app对象。我不知道如何用gunicorn修复它。
===附加说明===
我找到了一种使用gunicorn的方法。
- 将函数(启动管道(移动到另一个模块,如
df_pipeline/pipe.py
.
├── df_pipeline
│ ├── __init__.py
│ └── pipe.py
├── Dockerfile
├── main.py
├── requirements.txt
└── setup.py
# in main.py
import df_pipeline as pipe
result = pipe.preprocess(....)
- 在与
main.py
相同的目录中创建setup.py
# setup.py
import setuptools
setuptools.setup(
name='df_pipeline',
install_requires=[],
packages=setuptools.find_packages(include=['df_pipeline']),
)
- 在
df_pipeline/pipe.py
中将管道选项setup_file
设置为./setup.py