大块BigQuery响应,并使用ApacheBeam和Dataflow将块保存在CSV文件中



我是Apache Beam和Dataflow的新手。我正在尝试获取大约20000条记录的大数据集。我必须为1000条记录进行分块,并将分块保存在单独的CSV文件中。我知道如何从BQ中读取并写入CSV,但无法理解如何使用波束变换或是否有任何其他方法对文件进行分块。

我尝试的是:我从简单的代码开始,将从BQ读取的数据传递给ParDo函数。此外,我不知道如何使用ParDo来压缩记录,或者如果这不是正确的方法,请指导我正确的方向。

ParDo也没有打印我在下面代码中传递的元素。

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class Printer(beam.DoFn):
def process(self, element):
print(element) 
yield element

def run():
with beam.Pipeline() as p:
pcoll = (p
| "ReadFromBigQuery" >> beam.io.ReadFromBigQuery(
query='SELECT email, name, age FROM `my_db`;', use_standard_sql=True)
| "par" >> beam.ParDo(Printer())
| "Print for now" >> beam.Map(print)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
run()

谢谢你的帮助。

要编写CSV文件,可以在beam.io.WriteToText前面加上MapDoFn,将元素格式化为逗号分隔的行。如果您的数据是模式化的,您也可以使用数据帧API通过to_csv方法直接写入。

输出文件的分片由工作者的分片决定,这可能是动态的。如果每个区块中正好需要1000条记录,那么唯一的方法就是通过手动写入内容的DoFn,例如

def write_to_file(contents, prefix):
path = '%s-%d' % (prefix, hash(contents))
with beam.io.filesystems.FileSystems.create(path + '.tmp') as fout:
fout.write(contents)
beam.io.filesystems.FileSystems.rename([path + '.tmp'], [path])
(input_pcoll
| beam.Map(lambda row: ','.join(str(s) for s in row))  # or similar
| beam.BatchElements(min_batch_size=1000, max_batch_size=1000)
| beam.Map(lambda lines: 'n'.join(lines))
| beam.Map(write_to_file, '/some/path/out'))