问候民众1
我正在尝试使用云数据流将数据从GCS加载到BigQuery。
桶内的数据存储在以下结构中
"bucket_name/user_id/date/date_hour_user_id.csv";
示例";my_bucket/user_1262/2021-01-02/2021-012_18_user_id.csv";
如果我有5个用户,例如["user_1262"、"user_1263"、"user_1264"、"user _1265"、"用户_1266"]
并且我想要为所有客户端加载bq(1小时的数据(,例如hour="1";18〃;在1周的范围内,我想迭代所有
客户端获取前缀为18的文件我已经创建了这个代码,但迭代感染了数据
流水线对于从一个客户端移动到另一个客户端的每一个代码都运行一个新的流水线。
def run(argv=None):
mydate=['2021-01-02 00:00:00', '2021-01-02 23:00:00']
fmt = '%Y-%m-%d %H:%M:%S'
hour = dt.timedelta(hours=1)
day = dt.timedelta(days=1)
start_time, end_time = [dt.datetime.strptime(d, fmt) for d in mydate]
currdate = start_time
cols = ['cols0','cols1']
parser = argparse.ArgumentParser(description="User Input Data .")
args, beam_args = parser.parse_known_args(argv)
while currdate <= end_time:
str_date = currdate.strftime('%Y-%m-%d')
str_hour = '%02d' % (int(currdate.strftime('%H')))
print("********WE ARE PROCESSING FILE ON DATE ---> %s HOUR --> %s" % (str_date, str_hour))
user_list = ["user_1262", "user_1263", "user_1264", "user_1265", "user_1266"]
for user_id in user_list:
file_path_user = "gs://user_id/%s/%s/%s_%s_%s.csv" % (user_id, str_date, str_date, str_hour, user_id)
with beam.Pipeline(options=PipelineOptions(beam_args)) as p:
input_data = p | 'ReadUserfile' >> beam.io.ReadFromText(file_path_user_table, columns=cols)
decode = input_data | 'decodeData' >> beam.ParDo(de_code())
clean_data = decode | 'clean_dt' >> beam.Filter(clea_data)
writetobq....
currdate += day
run()
您可以继续在管道创建脚本中生成输入文件列表。但是,您可以将它们放入一个列表中,而不是为每个输入文件创建一个新的管道。然后,让您的管道从读取该列表的Create转换开始,然后是textio。ReadAllFromText转换。这将从文件列表中创建一个PCollection,然后开始从该文件列表中读取。