使用DataFlowRunner的Apache Beam管道在从云功能部署时会遇到_dill.py:"ModuleNotFoundError: No module named 'main'"



我正在尝试使用Python-sdk从GCP上的云函数执行数据流管道。在笔记本服务器上测试了代码,管道在该服务器上使用DataFlowRunner。然而,当使用云函数调用管道时,我得到了以下内容:

错误


Traceback (most recent call last): File "/env/local/lib/python3.7/site-packages/google/cloud/functions/worker.py", 
line 346, in run_http_function result = _function_handler.invoke_user_function(flask.request) File "/env/local/lib/python3.7/site-packages/google/cloud/functions/worker.py", 
line 217, in invoke_user_function return call_user_function(request_or_event) File "/env/local/lib/python3.7/site-packages/google/cloud/functions/worker.py", 
line 210, in call_user_function return self._user_function(request_or_event) File "/user_code/main.py", 
line 215, in run_main BUCKET=BUCKET) File "/user_code/main.py", 
line 143, in dataflow create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED File "/env/local/lib/python3.7/site-packages/apache_beam/pipeline.py", 
line 481, in __exit__ self.run().wait_until_finish() File "/env/local/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", 
line 1449, in wait_until_finish (self.state, getattr(self._runner, 'last_error_msg', None)), self) apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", 
line 286, in loads return dill.loads(s) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", 
line 275, in loads return load(file, ignore, **kwds) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", 
line 270, in load return Unpickler(file, ignore=ignore, **kwds).load() File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", 
line 472, in load obj = StockUnpickler.load(self) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", 
line 462, in find_class return StockUnpickler.find_class(self, module, name) ModuleNotFoundError: No module named 'main' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", 
line 648, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", 
line 176, in execute op.start() File "apache_beam/runners/worker/operations.py", 
line 649, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", 
line 651, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", 
line 652, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", 
line 261, in apache_beam.runners.worker.operations.Operation.start File "apache_beam/runners/worker/operations.py", 
line 266, in apache_beam.runners.worker.operations.Operation.start File "apache_beam/runners/worker/operations.py", 
line 597, in apache_beam.runners.worker.operations.DoOperation.setup File "apache_beam/runners/worker/operations.py", 
line 602, in apache_beam.runners.worker.operations.DoOperation.setup File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", 
line 290, in loads return dill.loads(s) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", 
line 275, in loads return load(file, ignore, **kwds) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", 
line 270, in load return Unpickler(file, ignore=ignore, **kwds).load() File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", 
line 472, in load obj = StockUnpickler.load(self) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", 
line 462, in find_class return StockUnpickler.find_class(self, module, name) ModuleNotFoundError: No module named 'main'

因此,在我看来,这个问题只会在无服务器调用数据流时发生。我试着按照这里的建议添加一个安装文件,使管道以正确的版本安装依赖项,但这并没有解决它。在我看来,这个问题与这个问题类似,但我怀疑唯一(不可接受的(答案是否有效,因为云函数代码总是从main.py运行。

管道代码

class getResponse(beam.DoFn):
def process(self, element, urlfield, pfield):
response = requests.get(element[urlfield])
status_code = response.status_code
if status_code >= 200 and status_code < 300:
yield {'id': element[pfield], 'response': response, 'url': element[urlfield]}
class getImageData(beam.DoFn):
def process(self, element, responsefield, urlfield, pfield):
p = element[responsefield]
img = Image.open(BytesIO(p.content)).resize((10, 10), Image.ANTIALIAS).convert("L")
yield {'id': element[pfield], 'url': element[urlfield], 'image_data': list(img.getdata())}
class outputDummies(beam.DoFn):
def process(self, element, dummy_data, image_datafield, urlfield, pfield):
if element[image_datafield] == dummy_data:
yield {'id': element[pfield], 'url': element[urlfield]}
def dataflow(in_test_mode=True,
query=None,
table_schema=None,
table_spec=None,
dummy_data=None,
job_name=None,
PROJECT=None,
REGION=None,
BUCKET=None):
if in_test_mode:
RUNNER = "DirectRunner"
OUTPUT_DIR = "gs://{0}/dummy_images/".format(BUCKET)
else:
RUNNER = "DataflowRunner"
OUTPUT_DIR = "gs://{0}/dummy_images/".format(BUCKET)
options = {
"job_name": job_name,
"project": PROJECT,
"region": REGION,
"staging_location": os.path.join(OUTPUT_DIR, "tmp", "staging"),
"temp_location": os.path.join(OUTPUT_DIR, "tmp"),
"streaming": False
}
opts = beam.pipeline.PipelineOptions(**options)
# Run Beam
with beam.Pipeline(RUNNER,
options=opts,
argv=['--setup_file', '/tmp/setup.py']) as p:
(p |
"Read data" >> beam.io.Read(beam.io.BigQuerySource(query=query,
use_standard_sql=True)) |
"Get responses" >> beam.ParDo(getResponse(),
urlfield='url',
pfield='id') |
"Process images" >> beam.ParDo(getImageData(),
responsefield='response',
urlfield='url',
pfield='id') |
"Output dummy images" >> beam.ParDo(outputDummies(),
dummy_data=dummy_data,
image_datafield='image_data',
urlfield='url',
pfield='id') |
"Write to BQ" >> beam.io.WriteToBigQuery(
table_spec,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)

requirements.txt

apache_beam[gcp]==2.19.0
pillow==6.2.1
requests==2.23.0

有人有变通办法吗?

感谢您的所有建议,最终我没有找到错误的解决方案,但我确实找到了工作流的解决方案。正如AMargheriti所指出的,数据流模板总是存在的。通过创建代码的自定义模板,我能够使用云函数触发流。有用的文档是创建数据流模板、运行模板页面以及最后一个解决方案,因为运行模板页面上建议的API不允许设置运行模板的区域,而dataflow(.products((.locations((.templates((.launch((允许添加此选项。

相关内容

  • 没有找到相关文章

最新更新