附加数据文件(.csv、.json)作为要在Dataflow上使用的安装包的一部分



我正在尝试使用数据流来完成一项需要使用.csv和.json文件的任务。据我所知,我应该能够创建一个包含这些文件的setup.py文件,并将它们分发给多个工作人员。

这就是我的文件的布局:

pipline.py
setup.py
utils /
-->__init__.py
-->**CSV.csv**
-->**JSON.json**

这是我的setup.py文件:

import setuptools
setuptools.setup(name='utils',
version='0.0.1',
description='utils',
packages=setuptools.find_packages(),
package_data={'utils': ['**CSV.csv**', '**JSON.json**']},
include_package_data=True)

这是我的豆子。DoFn函数:

class DoWork(beam.DoFn):
def process(self, element):
import pandas as pd
df_csv = pd.read_csv('**CSV.csv**')
df_json = pd.read_json('**JSON.json**')
Do other stuff with dataframes
yield [stuff]

我的管道是这样设置的:

dataflow_options = ['--job_name=pipline',
'--project=pipeline',
'--temp_location=gs://pipeline/temp',
'--staging_location=gs://pipeline/stage',
'--setup_file=./setup.py']
options = PipelineOptions(dataflow_options)
gcloud_options = options.view_as(GoogleCloudOptions)
options.view_as(StandardOptions).runner = 'DataflowRunner'
with beam.Pipeline(options=options) as p:
update = p | beam.Create(files) | beam.ParDo(DoWork())

基本上我一直得到一个:

IOError: File CSV.csv does not exist

它认为.json文件也不存在,只是在到达该步骤之前出错。这些文件可能没有进入数据流,或者我在DoFn中错误地引用了它们。我是否应该将文件放入setup函数的data_files参数中,而不是package_data?

您需要上传gs中的输入文件并给出gs位置,而不是CSV。我认为您在本地运行了代码,csv文件与代码位于同一目录中。但是使用DataflowRunner运行它需要gs.中的文件

最新更新