如何并行执行自定义可拆分DoFn



我正在尝试为Apache Beam开发一个用Python编写的自定义I/O连接器。根据官方指导方针,可拆分DoFn(SDF(是我选择的框架。

我尝试运行SDF编程指南中的伪代码,但未能并行执行管道。下面是一个工作示例。

伪数据

myfile = open('test_beam.txt', 'w')
for i in range(0, 1000):
myfile.write("%sn" % i)
myfile.close

管道

确保将DUMMY_FILE替换为test_beam.txt的绝对路径。

import argparse
import logging
import os
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions

from time import sleep
import random
from apache_beam.io.restriction_trackers import OffsetRange
DUMMY_FILE = absolute_path_to_dummy_data_file
class FileToWordsRestrictionProvider(beam.transforms.core.RestrictionProvider
):
def initial_restriction(self, file_name):
return OffsetRange(0, os.stat(file_name).st_size)
def create_tracker(self, restriction):
return beam.io.restriction_trackers.OffsetRestrictionTracker(
offset_range=self.initial_restriction(file_name=DUMMY_FILE))
def restriction_size(self, element, restriction):
return restriction.size()

class FileToWordsFn(beam.DoFn):
def process(
self,
file_name,
# Alternatively, we can let FileToWordsFn itself inherit from
# RestrictionProvider, implement the required methods and let
# tracker=beam.DoFn.RestrictionParam() which will use self as
# the provider.
tracker=beam.DoFn.RestrictionParam(FileToWordsRestrictionProvider())):
with open(file_name) as file_handle:
file_handle.seek(tracker.current_restriction().start)
while tracker.try_claim(file_handle.tell()):
yield read_next_record(file_handle=file_handle)

def read_next_record(file_handle):
line_number = file_handle.readline()
logging.info(line_number)
sleep(random.randint(1, 5))
logging.info(f'iam done {line_number}')

def run(args, pipeline_args, file_name):
pipeline_options = PipelineOptions(pipeline_args)
with beam.Pipeline(options=pipeline_options) as p:
execute_pipeline(args, p, file_name)

def execute_pipeline(args, p, file_name):
_ = (
p |
'Create' >> beam.Create([file_name]) |
'Read File' >> beam.ParDo(FileToWordsFn(file_name=file_name))
)

if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
# to be added later
args, pipeline_args = parser.parse_known_args()
file_name = DUMMY_FILE
run(args, pipeline_args, file_name)

SDF取自这里的第一个例子,然而,我不得不修复一些事情(例如,定义restriction_size()的一个小错误(。此外,我在read_next_record中引入了一个随机睡眠,以检查管道是否并行执行(这显然不是(。

我建造管道的方式可能有错误?我希望将我的SDF作为管道中的第一步,但这样做会导致AttributeError: 'PBegin' object has no attribute 'windowing'。为了避免这个问题,我遵循了这篇文章,并添加了一个创建的包含输入file_name的PCollection。

在管道内并行执行SDF的正确方法是什么?

波束DoFns(包括SplittableDoFns(对输入PCollection进行操作。对于SplittableDoFn,输入通常是源配置的PCollection(例如,输入文件(。当执行SplittableDoFn时,Beam runner能够通过隔离使用RestrictionTracker读取的输入的部分来并行化甚至单个输入元件的执行。因此,对于一个文件,这意味着可能有并行运行的工作程序从同一个文件中读取数据,但偏移量不同。

因此,您的实现似乎是正确的,并且应该已经为Beam runner的并行执行提供了便利。

apachebeam的可拆分DoFns允许创建一个自定义配置来运行启动的拆分,在我的案例中,我不得不处理一个大文件,其中所有内容都没有分隔符,这些内容都在一行中,数据流没有缩放。我使用了beam.transforms.core.RestrictionProvider,带有函数分割,在这里我指定了读取文件的部件数量,当我运行作业数据流时,使用了这个配置,使用了各种工人,处理时间大大减少。

class FileToLinesRestrictionProvider(beam.transforms.core.RestrictionProvider):
def initial_restriction(self, file_name):
return OffsetRange(0, size_file) #6996999736   #43493
#return OffsetRange(0, os.stat(file_name).st_size)
def create_tracker(self, restriction):
# return beam.io.restriction_trackers.OffsetRestrictionTracker(
#     offset_range=self.initial_restriction(file_name=rutaFile_Test))
return beam.io.restriction_trackers.OffsetRestrictionTracker(restriction)

def split(self, file_name, restriction):
# Configuración para leer el archivo por partes
bundle_ranges = calcular_segmentos_lectura(tamFila, tam_segmentos, size_file)
for start, stop in bundle_ranges:
yield OffsetRange(start, stop)
def restriction_size(self, element, restriction):
#print(restriction.size())
return restriction.size()

class FileToLinesFn(beam.DoFn):
def process(
self,
file_name,
# Alternatively, we can let FileToWordsFn itself inherit from
# RestrictionProvider, implement the required methods and let
# tracker=beam.DoFn.RestrictionParam() which will use self as
# the provider.
tracker=beam.DoFn.RestrictionParam(FileToLinesRestrictionProvider())):
with FileSystems.open(file_name) as file_handle:
file_handle.seek(tracker.current_restriction().start)
print(tracker.current_restriction())
while tracker.try_claim(file_handle.tell()):
#print(file_handle.tell())
yield  file_handle.read(tamFila)
def calcular_segmentos_lectura(
size_line,
tam_segmentos,
tam_file):

""" Basado en el tamaño del archivo y tamaños de las lineas divide en partes de acuerdo
a los parametros de entrada
Retorna array con los caracteres que deben procesar en cada paso
"""
num_lineas = int(tam_file /size_line)
valor_segmento = int(num_lineas / tam_segmentos)
valor_segmento = valor_segmento * size_line
print(valor_segmento)
segmentos_ranges = []
valorAnterior = 0
for i in range(tam_segmentos):
start = valorAnterior
stop_position = (valorAnterior + (valor_segmento))
valorAnterior = stop_position 
if (i + 1) == tam_segmentos:
stop_position = tam_file
segmentos_ranges.append((start, stop_position))
return segmentos_ranges

这个例子帮了我很多url