如何使用Apache Beam python将GroupByKey()转换的结果基于键进行拆分并将值写入GCS buck



我是Apache Beam、Dataflow和Python的新手,如有任何帮助,我们将不胜感激。我有一个需求,我需要通过从BigQuery表中获取记录并使用python中的Apache Beam将结果写入GCS桶来生成报告。我把管道写如下-

#Here I am converting the BigQuery output to 2 element tuple where elements are dictionaries for ex : 
({'institution_id' :'100'},{'customer_id' : '1000','customer_name': 'ABC','customer_email' : 'abc@xxx.com','phone_number': '00012345'})
class convtotupleofdict(beam.DoFn):
def process(self,element):
return[( {'institiution_id' : element['institiution_id'] },
{'customer_id':element['customer_id'],
'customer_name' :  element['customer_name'],
'customer_email' : element['customer_email'],
'phone_number' : element['phone_number']})]
with beam.Pipeline(options=pipeline_options) as p:
csv_ip=    p | 'ReadfromBQ' >> beam.io.ReadFromBigQuery(query='SELECT institiution_id,customer_id,customer_name,customer_email,phone_number from <table name> where customer_status='Active' order by
institiution_id,customer_id', use_standard_sql=True) 
| 'ConvttoTupleofDict' >> beam.ParDo(convtotupleofdict()) 
| 'Groupbyinstitution_id' >> beam.GroupByKey() 

op_gcs= csv_ip 
| 'WritetoGCS' >> beam.io.fileio.WriteToFiles(
path='gs://my-bucket/reports',
sink=lambda dest :beam.io.fileio.TextSink()

)

我使用GroupbyKey((转换来基于institute_id对数据进行分组,这样我就可以根据institute_id对数据进行拆分,并为GCS bucket中的每个institute_创建单独的文件。GroupbyKey((的输出如下-

({'institution_id' :'100'},{'customer_id' : '1000','customer_name': 'ABC','customer_email' : 'abc@xxx.com','phone_number': '00012345'},{'customer_id' : '2000','customer_name': 'XYZ','customer_email' : 'xyz@xxx.com','phone_number': '12378'})
({'institution_id' :'200'},{'customer_id' : '3000','customer_name': 'MNO','customer_email' : 'mno@xxx.com','phone_number': '789102'},{'customer_id' : '4000','customer_name': 'PQR','customer_email' : 'ttt@xxx.com','phone_number': '123789'})

现在,我正在努力将GroupbyKey((输出转换为csv文件以上传到GCS bucket。我了解了beam.io.fileio.WriteToFileshttps://beam.apache.org/releases/pydoc/2.16.0/apache_beam.io.fileio.html因为这可以用于将文件写入动态目的地。为了按institute_id划分数据,我应该如何提供WritetoFiles的以下参数——路径、目的地、接收器和文件命名。我知道目的地和水槽是可调用的,但我无法构建它。我有点困在这一点上,无法继续。事实上,我在params destination和sink之间感到困惑,我应该如何编写它来基于institute_id分割数据并生成csv文件?目前,我正在用DirectRunner测试我的代码。

我希望它能有所帮助。

我为你提出一个完整的解决方案来满足你的需要。

Beammain.py文件:

import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from your_root_folder.file_io_customer_by_institution_transform import 
FileIOCustomerByInstitutionTransform

def run():
pipeline_options = PipelineOptions()
with beam.Pipeline(options=pipeline_options) as p:
input = [
{
'institution_id': '100',
'customer_id': '1000',
'customer_name': 'ABC',
'customer_email': 'abc@xxx.com',
'phone_number': '00012345'
},
{
'institution_id': '100',
'customer_id': '1001',
'customer_name': 'ABCD',
'customer_email': 'abcd@xxx.com',
'phone_number': '00012346'
},
{
'institution_id': '101',
'customer_id': '1001',
'customer_name': 'ABCD',
'customer_email': 'abcd@xxx.com',
'phone_number': '00012346'
}
]
(
p
| beam.Create(input)
| "Group customers by institution" >> beam.GroupBy(lambda customers: customers['institution_id'])
| f"Write file to GCS" >> FileIOCustomerByInstitutionTransform('gs://mazlum_dev/dynamicfiles')
)

if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
run()

job_config文件:

class JobConfig:
FILE_ENCODING = 'utf-8'
FILE_NAME_TIMESTAMP_FORMAT = '%Y%m%d%H%M%S'
CSV_SEPARATOR = ','

file_io_customer_by_institution_transform.py文件:

from datetime import datetime
from typing import List, Dict, Tuple, Iterable
import apache_beam as beam
from apache_beam.io.fileio import TextSink, WriteToFiles
from pytz import timezone
from integration_ocd.pythonjobs.common.module.job_config import JobConfig

class InstitutionDestinationParamError(Exception):
pass

class FileIOCustomerByInstitutionTransform(beam.PTransform):
def __init__(self, output_path: str):
super().__init__()
self._output_path = output_path
def expand(self, pcoll):
return (
pcoll
| f"Write files to GCS" >>
WriteToFiles(
path=self._output_path,
destination=to_institution_destination,
file_naming=self.build_file_name,
sink=lambda institution_dest: CustomCsvSink())
)
def build_file_name(self, *args) -> str:
"""
Build the file name dynamically from parameters given by Beam in the 'writeToFile' PTransform
A destination is built with institution as value (key of group in the PCollection), then the file name
is built from this institution (5th argument)
"""
file_name_timestamp = datetime.now(timezone('Europe/Paris')).strftime(JobConfig.FILE_NAME_TIMESTAMP_FORMAT)
try:
institution_destination: str = args[5]
return f'CUSTOMER_INSTITUTION_{institution_destination}_{file_name_timestamp}.csv'
except Exception as err:
raise InstitutionDestinationParamError('The institution destination param must be passed', err)

class CustomCsvSink(TextSink):
def __init__(self):
super().__init__()
def write(self, customers_with_institution):
customers: Iterable[Dict[str, str]] = customers_with_institution[1]
for index, customer in enumerate(customers, start=1):
if index == 1:
header_field_names: bytes = self.build_csv_header_file(customer)
self._fh.write(header_field_names)
self._fh.write(self.get_csv_line_break())
customer_csv_entry = self.convert_dict_to_csv_record(customer)
self._fh.write(customer_csv_entry)
self._fh.write(self.get_csv_line_break())
def get_csv_line_break(self) -> bytes:
return 'n'.encode(JobConfig.FILE_ENCODING)
def build_csv_header_file(self, customer_dict: Dict[str, str]) -> bytes:
header_field_names: str = JobConfig.CSV_SEPARATOR.join(customer_dict.keys())
return header_field_names.encode(JobConfig.FILE_ENCODING)
def convert_dict_to_csv_record(self, customer_dict: Dict[str, str]) -> bytes:
"""
Turns dictionary values into a comma-separated value formatted string
The separator is added to a configuration file
"""
customer_csv_record: str = JobConfig.CSV_SEPARATOR.join(map(str, customer_dict.values()))
return customer_csv_record.encode(JobConfig.FILE_ENCODING)

def to_institution_destination(customers_with_institution: Tuple[str, List[Dict[str, str]]]) -> str:
"""
Map the given tuple to the institution as destination in 'WriteToFiles' PTransform.
Then this destination can be used in the 'file_name' part.
"""
return customers_with_institution[0]

一些解释:

  • 在我的示例中,输入数据被模拟为客户数据
  • 主文件中的第一个操作是group byinstitution_id字段上有Beam
  • 在接下来的步骤中,我创建了一个带有PTransform的独立文件,其中包含写入动态文件的所有逻辑

file_io_customer_by_institution_transform.py文件:

  • 此文件使用JobConfig对象的一个小配置
  • 创建CSVSink以从PCollection中的元素生成CSV
  • filename由当前机构ID和当前时间戳生成
  • institution ID分组的每个元素生成一个文件

在我的例子中:

  • file 1与机构100:CUSTOMER_INSTITUTION_100_20221011160828.csv=>包含2条CSV行

  • CCD_ 17与机构101:CUSTOMER_INSTITUTION_101_20221011160828.csv=>包含1个CSV行

请注意,对于这种用例和WriteToFiles的使用,BeamPython中的文档并不完整。

我认为TextIO.WriteToText将更友好,并且能够通过传递很少的参数来输出CSV(它是FileIO之上的抽象(。

以下是我的建议:

csv_ip  | 'WritetoGCS' >> beam.io.WriteToText(
'gs://{0}/reports'.format(BUCKET), file_name_suffix='.csv')

注意:我错过了您想要使用动态文件目的地的部分。如果这是一个要求,我建议您检查Apache Beam DynamicDestinations Python等效程序。

基于这个例子,它将是沿着以下几条线的东西:

csv_ip | 'WritetoGCS' >> beam.io.fileio.WriteToFiles(
path='gs://{0}/reports/'.format(BUCKET),
destination=lambda record: record['institution_id'],
sink=lambda dest: CsvSink(),
file_naming=beam.io.fileio.destination_prefix_naming())

最新更新