在GCP控制台创建数据流作业期间提供参数时出错



自2021年10月05/06以来,我的GCP Dataflow模板文件正在获取模板创建期间提供的参数值(当我在本地机器中运行.py文件以便在GCP存储上创建模板文件时),并且没有获得基于同一模板文件的作业创建期间提供的参数。如果我在模板创建期间没有提供任何值,它们会假设一个RuntimeValueProvider(当不使用args的默认值时),而不是在作业创建期间提供的值。

在作业创建期间提供的参数存储在Dataflow作业会话中。如果我打开作业,转到右侧栏并打开"管道选项",那么在创建作业时提供的正确值就在那里,但它们没有到达代码。

我在GCP控制台以经典的方式从模板运行我的代码:

gcloud dataflow jobs run JOB_NAME --gcs-location gs://LOCATION/TEMPLATE/FILE --region REGION --project PROJ_NAME --worker-machine-type MACHINE_TYPE --parameters PARAM_1=PARAM_1_VALUE,PARAM_2=PARAM_2_VALUE

我使用SDK 2.32.0,在代码中我使用"parser.add_value_provider_argument"而不是"parser.add_argument"但我使用"parser。add_argument"但我没有成功。对于这两种情况,我的代码都假定运行.py文件时的参数值。

示例1

import apache_beam.io.gcp.gcsfilesystem as gcs
from apache_beam.options.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--PARAM_1', 
type=str)
parser.add_value_provider_argument('--PARAM_2', 
type=str)
beam_options = PipelineOptions()
args = beam_options.view_as(MyOptions)
# Some business operations with args that are always assuming the  values provided during template creation
options = {'project': PROJECT,
'runner': 'DataflowRunner',
'region': REGION,
'staging_location': 'gs://{}/temp'.format(BUCKET),
'temp_location': 'gs://{}/temp'.format(BUCKET),
'template_location': 'gs://{}/template/batch_poc'.format(BUCKET)}
pipeline_options = PipelineOptions.from_dictionary(options)
with beam.Pipeline(options = pipeline_options) as p:
lines = (p
| beam...
)

示例2(与示例1相同,但使用默认值)

# ... same as example 1
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--PARAM_1',
default="test1",
type=str)
parser.add_value_provider_argument('--PARAM_2', 
default="test2",
type=str)
# ... same as example 1

在所有情况下,我在创建作业时提供的参数将被忽略。

案例1:当在本地机器上运行没有参数的示例1(如下面的python命令),并在GCP控制台上运行其模板时,使用两种情况:args和没有参数(如下面的第二个命令)。PARAM_1_VALUE和PARAM_2_VALUE中的值一致:RuntimeValueProvider(…)

LOCALHOST> python3 code.py
GCP> gcloud dataflow jobs run ...
OR
GCP> gcloud dataflow jobs run ... --parameters PARAM_1=another_test_1,PARAM_2=another_test_2

案例2:在本地机器上使用args运行示例1(如下面的python命令),并在GCP控制台上使用两种情况运行其模板:args和不使用args(如下面的第二个命令)。PARAM_1_VALUE和PARAM_2_VALUE中的值与模板创建时传递的值相同:another_test_{value}而不是another_another_test_{value}

LOCALHOST> python3 code.py --PARAM_1 another_test_1 --PARAM_2 another_test_2
GCP> gcloud dataflow jobs run ...
OR
GCP> gcloud dataflow jobs run ... --parameters PARAM_1=another_another_test_1,PARAM_2=another_another_test_2

案例3:当在本地机器上运行没有参数的示例2(如下面的python命令)并在GCP控制台上运行其模板时,使用两种情况:args和没有参数(如下面的第二个命令)。"PARAM_1_VALUE"one_answers"PARAM_2_VALUE"为默认值。

LOCALHOST> python3 code.py
GCP> gcloud dataflow jobs run ...
OR
GCP> gcloud dataflow jobs run ... --parameters PARAM_1=another_test_1,PARAM_2=another_test_2

案例4:当在本地机器上使用args运行示例2(如下面的python命令)并在GCP控制台上运行其模板时,使用两种情况:args和不使用args(如下面的第二个命令)。

注意:我更新了两个库:apache-beam和apache-beam[gcp]

注意"——PARAM_1_VALUE", "——PARAM_1_VALUE"…值不能在管道施工期间使用。按1:

" RuntimeValueProvider是默认的ValueProvider类型。RuntimeValueProvider允许您的管道接受仅在管道执行期间可用的值。该值在管道构建期间不可用,因此您不能使用该值来更改管道的工作流图。

文档显示,在ValueProvider参数上使用.get()方法允许您在运行时检索值并在函数中使用它。字面意思:

"要在自己的函数中使用运行时参数值,请更新函数以使用ValueProvider参数。"

这里ValueProvider.get()是在运行时方法DoFn.process()中调用的。

基于此,我建议您更改以下代码2并重试。