在Python中安排Google Cloud Dataflow作业



目前,我知道以下是安排Dataflow作业执行的选项:

  • 使用应用引擎Cron服务或云功能。

    • 这个例子是用Java做的,有没有Python这么简单的官方例子
    • 这个例子是用Python编写的,但我不确定当前是否仍然是一个好的选项,或者是否"已弃用">
  • 从计算引擎中的cron作业

    • 有这方面的教程吗
  • 在流式传输管道中使用窗口

    • 我认为这是最简单的,但从总成本来看,这是最好的想法吗
  • Cloud Scheduler

    • 这是一个有效的方法吗

我使用App Engine Flex作为数据流启动器。这个微服务有端点可以按需启动数据流作业,cron也可以做到这一点。

这是我的项目结构:

df_tasks/
- __init__.py
- datastore_to_csv.py
- ...other_piplines
__init__.py
dflaunch.yaml
main.py
setup.py <-- used by pipelines

对我来说,这样做的诀窍是正确设置我的管道依赖关系。也就是说,对管道依赖项使用setup.py。像这样设置帮助最大:https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/complete/juliaset

setup.py:

import setuptools
setuptools.setup(
name='dataflow_python_pipeline',
version='1.0.0',
description='DataFlow Python Pipeline',
packages=setuptools.find_packages(),
)

我在df_tasks中的pipline配置如下:

pipeline_options = PipelineOptions.from_dictionary({
'project': project,
'runner': 'DataflowRunner',
'staging_location': bucket_path+'/staging',
'temp_location': bucket_path+'/temp',
'setup_file': './setup.py'
})

然后在main.py:

from df_tasks import datastore_to_csv
project_id = os.environ['GCLOUD_PROJECT']
@app.route('/datastore-to-csv', methods=['POST'])
def df_day_summary():
# Extract Payload
payload = request.get_json()
model = payload['model']
for_date = datetime.datetime.strptime(payload['for_date'], '%Y/%m/%d')
except Exception as e:
print traceback.format_exc()
return traceback.format_exc()
# launch the job
try:
job_id, job_name = datastore_to_csv.run(
project=project_id,
model=model,
for_date=for_date,
)
# return the job id
return jsonify({'jobId': job_id, 'jobName': job_name})
except Exception as e:
print traceback.format_exc()
return traceback.format_exc()

有多种方法,但我认为对您来说非常方便的一种方法是使用Apache Airflow的DataflowPythonOperator。

GCP以Cloud Composer的形式为Apache Airflow提供托管服务,您可以使用它来安排数据流管道或其他GCP操作。

最新更新