" IndexError: tuple index out of range "作为数据流模板作业的



下面是我的python代码,它绝对工作得很好。

from __future__ import absolute_import
import apache_beam as beam
import argparse
import pickle
import logging
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.internal.clients import bigquery
from datetime import date
today = date.today()
current_date = today.strftime("%Y%m%d")
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions(pipeline_args))
(p | 'ReadTable' >> beam.io.Read(beam.io.BigQuerySource(query='select DISTINCT(REPLACE(MOBILE,"+91 ","91")) from `whr-asia-datalake-nonprod.WHR_DATALAKE.C4C_CONSUMER_RAW`',use_standard_sql=True))
| 'read values' >> beam.Map(lambda x: x.values())
| 'CSV format' >> beam.Map(lambda row:'|'.join ("WHIRLPOOL|WHR|"+ str(column) +'|"'+"Hi, This msg is from Whirlpool DL" + '"' for column in row))
| 'Write_to_GCS' >> beam.io.WriteToText('gs://whr-asia-datalake-dev-standard/outbound/Valuefirst/WHR_MOBILE_CNSNT_REQ'+''+ str(current_date),file_name_suffix='.csv',header='SENDER_ID|SHORTCODE|MOBILE_NUM|CONSENT_MSG'))
p.run().wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()

我修改了上面的代码,添加了一个新的创建需求,并清空了"。done"文件与上面创建的每个文件。我们在作业中添加了下面的函数来创建一个空文件

today = date.today()
current_date = today.strftime("%Y%m%d")
def create_done(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions(pipeline_args))
(p | 'Create .done File' >> beam.io.WriteToText('gs://whr-asia-datalake-dev-standard/outbound/Valuefirst/Valuefirst'+''+str(current_date),file_name_suffix='.done'))
p.run().wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
create_done()
然而,当我们为创建空的.done文件添加这段新代码时,脚本失败了,错误为

input_tag = transform_node.inputs[0].tag元组索引超出范围

我无法粘贴错误的完整追溯。请让我知道这有帮助。

WriteToText需要一个输入PCollection,但是您将它直接应用到Pipeline对象。为了运行您的管道,您需要一个数据源,如ReadFromText或Create。

参见https://beam.apache.org/documentation/programming-guide/获取更多关于管道和PCollections的信息,以及一些简单管道的示例

最新更新