从Google Cloud Composer运行数据流时导入依赖项的问题



我正在从Google Cloud Composer运行数据流,数据流脚本包含一些非标准依赖项,例如Zeep,googleads。这些需要安装在数据流工作的节点上,因此我将它们包装为setup.py。当我尝试在DAG中运行此操作时,作曲家正在验证数据流文件并抱怨No module names Zeep , googleads。因此,我创建了Pythonvirtualenvoperator,并安装了所需的所有非标准依赖项,并尝试运行数据流量作业,但它仍然抱怨iNopt Zeep和googleads。

这是我的代码库:

PULL_DATA = PythonVirtualenvOperator(
    task_id=PROCESS_TASK_ID,
    python_callable=execute_dataflow,
    op_kwargs={
        'main': 'main.py',
        'project': PROJECT,
        'temp_location': 'gs://bucket/temp',
        'setup_file': 'setup.py',
        'max_num_workers': 2,
        'output': 'gs://bucket/output',
        'project_id': PROJECT_ID},
    requirements=['google-cloud-storage==1.10.0', 'zeep==3.2.0',
                  'argparse==1.4.0', 'google-cloud-kms==0.2.1',
                  'googleads==15.0.2', 'dill'],
    python_version='2.7',
    use_dill=True,
    system_site_packages=True,
    on_failure_callback=on_failure_handler,
    on_success_callback=on_success_handler,
    dag='my-dag')

和我的python可呼叫代码:

def execute_dataflow(**kwargs):
        import subprocess
        TEMPLATED_COMMAND = """
                          python main.py 
                                 --runner DataflowRunner 
                                 --project {project} 
                                 --region us-central1 
                                 --temp_location {temp_location} 
                                 --setup_file {setup_file} 
                                 --output {output} 
                                 --project_id {project_id} 
                          """.format(**kwargs)
        process = subprocess.Popen(['/bin/bash', '-c', TEMPLATED_COMMAND])
        process.wait()
        return process.returncode

我的main.py文件

import zeep
import googleads
{Apache-beam-code to construct dataflow pipeline}

有什么建议?

我的工作有一个requirements.txt。它不是像您那样使用--setup_file选项,而是指定以下内容:

--requirements_file prod_requirements.txt

这告诉DataFlow在运行作业之前,请在requirements.txt中安装库。

参考:https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/

使用带有import googleads, zeep的示例数据流管线脚本,我设置了一个测试作曲家环境。DAG和您一样,我会遇到同样的错误。然后,我进行了几个更改,以确保可以在工作机器上找到依赖项。

在DAG中,我使用普通PythonOperator,而不是PythonVirtualenvOperator。我在Google云存储存储桶中有数据流管线和设置文件(main.pysetup.py),因此作曲家可以找到它们。设置文件具有我需要拥有的要求列表。Zeep和googleads。我从这里调整了一个示例设置文件,更改以下内容:

REQUIRED_PACKAGES = [
    'google-cloud-storage==1.10.0', 'zeep==3.2.0',
'argparse==1.4.0', 'google-cloud-kms==0.2.1',
'googleads==15.0.2', 'dill'
    ]
setuptools.setup(
    name='Imports test',
    version='1',
    description='Imports test workflow package.',
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
    cmdclass={
        # Command class instantiated and run during pip install scenarios.
        'build': build,
        'CustomCommands': CustomCommands,
        }
    )

我的dag是

with models.DAG(  'composer_sample',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    PULL_DATA = PythonOperator(
    task_id='PULL_DATA',
    python_callable=execute_dataflow,
    op_kwargs={
        'main': '/home/airflow/gcs/data/main.py',
        'project': PROJECT,
        'temp_location': 'gs://dataflow-imports-test/temp',
        'setup_file': '/home/airflow/gcs/data/setup.py',
        'max_num_workers': 2,
        'output': 'gs://dataflow-imports-test/output',
        'project_id': PROJECT_ID})
    PULL_DATA

没有更改Python可呼叫。但是,使用此配置,我仍然会收到错误。

下一步,在Google云平台(GCP)控制台中,我转到"作曲家"通过导航菜单,然后单击环境的名称。在" pypi软件包"中,我添加Zeep和googleads,然后单击"提交"。更新环境需要一段时间,但起作用。

在此步骤之后,我的管道能够导入依赖项并成功运行。我还尝试使用GCP控制台上指示的依赖项运行DAG,但在setup.py的要求中不使用。工作流程再次破裂,但在不同的地方。因此,请务必在两个地方指示它们。


您需要在云作曲家环境中安装库(查看此链接)。有一种方法可以在控制台内进行操作,但我发现这些步骤更容易:

  1. 打开您的环境页
  2. 选择作曲家正在运行的实际环境
  3. 导航到 pypi软件包 tab
  4. 单击编辑
  5. 手动添加requirements.txt的每一行
  6. 保存

如果您为库提供的版本太旧了,您可能会遇到错误,因此请检查日志并根据需要更新数字。

最新更新