谷歌云数据流无法打开flex模板文件



运行部署脚本以启动数据流弹性作业后,我得到

"无法读取作业文件:gs://dataflow staging-europe-west2/------/staining/template_elaunchs/{JOBNAME}/job_object,错误消息为:(7ea9e263ad5cddb5(:无法打开模板文件:gs:/dataflow staging-europe-west2-6473358574/staining/template_elaughs/{JOBNAME}/job_oobject。。

控制台日志显示";模板启动成功";,并且在云构建日志中没有Python错误。

以下是我的云存储Python代码的主要结构,用于解析csv文件,对原始数据执行一些转换/计算,然后创建数据存储实体管道:

文件结构:

├── pipeline
│   ├── runner.py
│   ├── setup.py
│   ├── ingestion
│   │   ├── transformer.py
│   │   ├── custom.py

编辑器中文件的图像:1

码头文件

FROM gcr.io/dataflow-templates-base/python3-template-launcher-base

RUN apt-get update
# Upgrade pip and install the requirements.
RUN pip3 install --no-cache-dir --upgrade pip
RUN pip3 install apache-beam==2.35.0
RUN pip3 install google-cloud-logging

WORKDIR /
RUN mkdir -p /dataflow/template
WORKDIR /dataflow/template
COPY ingestion ${WORKDIR}/ingestion
COPY setup.py ${WORKDIR}/setup.py
COPY runner.py ${WORKDIR}/runner.py

ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/runner.py"
ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py"

# Since we already downloaded all the dependencies, there's no need to rebuild everything.
ENV PIP_NO_DEPS=True
# std libs
import os
import logging
import datetime
# helper modules
from ingestion.all_settings import *
from ingestion.avg_helpers import *
from ingestion.transform import *
# Data-flow modules
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.datastore.v1new.datastoreio import WriteToDatastore

# MAIN function, to run dataflow pipeline module
def dataflow():
JOB_NAME = f"datastore-upload-{datetime.datetime.now().strftime('%Y-%m-%d-%H%M%S')}"
#wildcard expression for file storage bucket containing the subject's data 
file_ex = [gs//bucket-example-csv-file]
#variable to store pipeline options to be passed into beam function later
pipeline_options = {
'runner': 'DirectRunner',
'project': PROJECT,
'region': 'europe-west-b',
'job_name': JOB_NAME,
'staging_location': TEST_BUCKET + '/staging',
'temp_location': TEST_BUCKET + '/temp',
'save_main_session': False,
'streaming': False,
'setup_file': '/dataflow/template/setup.py',
}
options = PipelineOptions.from_dictionary(pipeline_options)
with beam.Pipeline(options=options) as p:
for i,filename in enumerate(file_ex):
(p 
| 'Reading input files' >> beam.io.ReadFromText(filename, skip_header_lines = 1)
| 'Converting from csv to dict' >> beam.ParDo(ProcessCSV(), harvard_medical_headers)
| 'Create entities for minute averages' >> beam.ParDo(BuildMinuteEntities(),filename)
| 'Write entities into Datastore' >> WriteToDatastore(PROJECT)
)
p.run().wait_until_finish()

if __name__ == '__main__':
dataflow()

可能需要在波束选项中提及设置文件名:

...
#variable to store pipeline options to be passed into beam function later
pipeline_options = {
'runner': 'DirectRunner',
'project': PROJECT,
'region': 'europe-west-b',
'job_name': JOB_NAME,
'staging_location': TEST_BUCKET + '/staging',
'temp_location': TEST_BUCKET + '/temp',
'save_main_session': False,
'streaming': False,
'setup_file'='/dataflow/template/setup.py',
}

在我的案例中,它是由管道选项'runner': 'DirectRunner'引起的。如果您想在数据流引擎上启动数据流作业,您应该将'runner': 'DataflowRunner'放在那里。

最新更新