数据流将pubsub主题提供为arugument时出错



我遇到了一个问题,我使用python创建了一个数据流模板,在启动新的数据流作业时,该模板需要接受3个用户定义的参数。

问题出现在beam.io.gcp.pubsub.WriteToPubSub((中,我试图从ValueProvider提供主题名称,根据谷歌文档,创建模板时需要ValueProvider:

https://cloud.google.com/dataflow/docs/guides/templates/creating-templates

源beam.io.ReadFromPubSub((与转换beam.io.gcp.bigquery.WriteToBigQuery((.一样,成功接受订阅值的值提供程序

显然,分享我的代码会有所帮助:(

首先是通常的进口:

from __future__ import absolute_import
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.value_provider import StaticValueProvider
import json
import time
from datetime import datetime
import dateutil.parser
import sys

接下来是我为模板提供的输入参数定义的类:

class userOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--subscription',  
default='projects/MYPROJECT/subscrpiptions/subscription', 
help='PubSub subscription to listen on')
parser.add_value_provider_argument(
'--bqtable', 
default='dataset.table', 
help='Big Query Table Name in the format project:dataset.table') 
parser.add_value_provider_argument(
'--topic',  
default='projects/MYPROJECT/subscrpiptions/subscription', 
help='PubSub topic to write failed messages to')

管道本身被定义为(注意,我省略了映射函数(

def run():
user_options = PipelineOptions().view_as(userOptions)
pipeline_options = PipelineOptions()
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(StandardOptions).streaming = True
with beam.Pipeline(options=pipeline_options) as p:
records = ( 
p  | 'Read from PubSub' 
>> beam.io.ReadFromPubSub(
subscription=str(user_options.subscription),
id_label='Message_ID',
with_attributes=True)
| 'Format Message' >> 
beam.Map(format_message_element)
| 'Transform null records to empty list' >>
beam.Map(transform_null_records)
| 'Transform Dates' >>
beam.Map(format_dates)
| 'Write to Big Query' >>
beam.io.gcp.bigquery.WriteToBigQuery(
table=user_options.bqtable,
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND',
insert_retry_strategy='RETRY_NEVER'
)
| 'Write Failures to Pub Sub' >>
beam.io.gcp.pubsub.WriteToPubSub(user_options.topic)
) 

现在,当我尝试使用powershell命令生成模板时:

python profiles-pipeline.py --project xxxx-xxxxxx-xxxx `
--subscription projects/xxxx-xxxxxx-xxxx/subscriptions/sub-xxxx-xxxxxx-xxxx-dataflow `
--bqtable xxxx-xxxxxx-xxxx:dataset.table `
--topic projects/xxxx-xxxxxx-xxxx/topics/top-xxxx-xxxxxx-xxxx-failures `
--runner DataflowRunner `
--temp_location gs://xxxx-xxxxxx-xxxx/temp/ `
--staging_location gs://xxxx-xxxxxx-xxxx/staging/ `
--template_location gs://xxxx-xxxxxx-xxxx/template

我得到这个错误:

File "pipeline.py", line 193, in <module>
run()
File "pipeline.py", line 183, in run
beam.io.gcp.pubsub.WriteToPubSub(user_options.topic)
File "C:githubpipeline-dataflow-jobsdataflowlibsite-packagesapache_beamiogcppubsub.py", line 292, in __init__
topic, id_label, with_attributes, timestamp_attribute)
File "C:githubpipeline-dataflow-jobsdataflowlibsite-packagesapache_beamiogcppubsub.py", line 430, in __init__
self.project, self.topic_name = parse_topic(topic)
File "C:githubpipeline-dataflow-jobsdataflowlibsite-packagesapache_beamiogcppubsub.py", line 325, in parse_topic
match = re.match(TOPIC_REGEXP, full_topic)
File "c:program filespython37libre.py", line 173, in match
return _compile(pattern, flags).match(string)
TypeError: expected string or bytes-like object

我以前在尝试使用beam.io.WriteToBigQuery((时遇到过这个错误,但当我更改为beam.io.gcp.bigquery.WriteTo比格Query((后,错误被解决了,因为它接受了表名的ValueProvider。然而,对于pubsub,我找不到另一种可行的写作方式。

任何帮助都将不胜感激。

我已经部分解决了这个问题,因为我的管道错误地试图将失败的插入发布到Big Query,但我仍然存在无法将pubsub主题名称作为输入参数传递的问题。然而,如果主题名称是硬编码的,它确实有效

#################################################################
# Import the libraries required by the pipeline                 #
#################################################################
from __future__ import absolute_import
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.value_provider import RuntimeValueProvider
import json
import time
from datetime import datetime
import dateutil.parser
import sys
import logging
#################################################################
# Create a class for the user defined settings provided at job 
# creation
#################################################################
class userOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--subscription',  
default='projects/MYPROJECT/subscrpiptions/subscription', 
help='PubSub subscription to listen on')
parser.add_value_provider_argument(
'--bqtable', 
default='dataset.table', 
help='Big Query Table Name in the format project:dataset.table') 
parser.add_value_provider_argument(
'--topic', 
default='projects/MYPROJECT/topics/subscription', 
help='Pubsub topic to write failures to') 
##############################################################################
# Format failure message
##############################################################################
def format_failed_message(data):
try:
message=json.dumps(data)
except:
print("customError in function format_failed_message occured.", sys.exc_info(), "Message contents: ", data)
return message
#################################################################
# create a function called run                                  #
#################################################################
def run():
##############################################################
# Setup the pipeline options with both passed in arguments 
# and streaming options
##############################################################
user_options = PipelineOptions().view_as(userOptions)
pipeline_options = PipelineOptions()
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(StandardOptions).streaming = True
##############################################################
# Define the pipeline
##############################################################
with beam.Pipeline(options=pipeline_options) as p:
# First we create a PCollection which will contain the messages read from Pubsub
records = ( 
p  | 'Read from PubSub' 
>> beam.io.ReadFromPubSub(
subscription=str(user_options.subscription),
id_label='Message_ID',
with_attributes=True)
# Transform the message and its attributes to a dict.
| 'Format Message' >> 
beam.Map(format_message_element)
# Transform the empty arrays defined as element:null to element:[].
| 'Transform null records to empty list' >>
beam.Map(transform_null_records)
# Transform the dateCreated and DateModified to a big query compatible timestamp format.
| 'Transform Dates' >>
beam.Map(format_dates)
# Attempt to write the rows to BQ
| 'Write to Big Query' >>
beam.io.gcp.bigquery.WriteToBigQuery(
table=user_options.bqtable,
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND',
insert_retry_strategy='RETRY_NEVER'
)
)
#For any rows that failed to write to BQ
failed_data = (records[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
#Format the dictionary to a string
| 'Format the dictionary as a string for publishing' >>
beam.Map(format_failed_message)
#Encode the string to utf8 bytes
| 'Encode the message' >>
beam.Map(lambda x: x.encode('utf-8')).with_output_types(bytes)
)
#Published the failed rows to pubsub
failed_data | beam.io.gcp.pubsub.WriteToPubSub(topic='projects/xxxx-xxxxx-xxxxxx/topics/top-xxxxx-failures')
#failed_data | beam.io.gcp.pubsub.WriteToPubSub(topic=user_options.topic)
# As this is a streaming pipeline it will run continuosly till either we 
# stop the pipeline or it fails.
result = p.run()
result.wait_until_finish()
#At the main entry point call the run function
if __name__ == '__main__':
#logging.getLogger().setLevel(logging.INFO)
run()

|'对字节串进行编码'>gt;梁Map(encode_byte_string(#我认为你已经实现了这一部分|"写入pusub">gt;beam.io.WriteToPubSub(output_topic(--对我有用。

最新更新