我正在尝试在云数据流上运行apache-beam管道。原始函数已部署为云函数,该函数应该创建一个数据流作业,该作业读取文本文件并插入到大查询中。但它无法在数据流上运行。函数和错误如下。
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
import apache_beam as beam
class Split(beam.DoFn):
def process(self, element):
element = element.split(',')
return [{
'field_1': element[0],
'field_2': element[1],
'field_3': element[2]}]
def main(data, context):
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = my-project
google_cloud_options.job_name = job_name
google_cloud_options.staging_location = staging_location
google_cloud_options.temp_location = temp_location
google_cloud_options.service_account_email = service_account_email
options.view_as(StandardOptions).runner = 'DataflowRunner'
p = beam.Pipeline(options=options)
with p:
(
p
| 'ReadData' >> beam.io.ReadFromText(gs://source_file_location)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(table=bq_table,
schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
main('data', 'context')
我在数据流上遇到的错误是
Error message from worker: Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py",
line 286, in loads return dill.loads(s) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 275, in loads return load(file, ignore, **kwds) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 270, in load return Unpickler(file, ignore=ignore, **kwds).load() File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 472, in load obj = StockUnpickler.load(self) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 462, in find_class return StockUnpickler.find_class(self, module, name) ModuleNotFoundError:
No module named 'main' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 648, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 176, in execute op.start() File "apache_beam/runners/worker/operations.py",
line 649, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py",
line 651, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py",
line 652, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py",
line 261, in apache_beam.runners.worker.operations.Operation.start File "apache_beam/runners/worker/operations.py",
line 266, in apache_beam.runners.worker.operations.Operation.start File "apache_beam/runners/worker/operations.py",
line 597, in apache_beam.runners.worker.operations.DoOperation.setup File "apache_beam/runners/worker/operations.py",
line 602, in apache_beam.runners.worker.operations.DoOperation.setup File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py",
line 290, in loads return dill.loads(s) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 275, in loads return load(file, ignore, **kwds) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 270, in load return Unpickler(file, ignore=ignore, **kwds).load() File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 472, in load obj = StockUnpickler.load(self) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 462, in find_class return StockUnpickler.find_class(self, module, name)
ModuleNotFoundError: No module named 'main'
您可能必须将代码(包括 DoFn(作为依赖项捆绑在一个单独的文件中;请参阅 https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/
在这种情况下,听起来像云函数从名为main.py
的文件中执行您的文件;这将给出这种错误。我建议将您的代码打包为依赖项,这里的代码将from my_lib import main
。