使用Apache Beam Python SDK将文件写入Parquet中的动态目的地



我正在尝试通过WriteToFiles类使用动态目的地编写Parquet文件。

我甚至发现了一些进一步开发的例子,比如这个,他们建立了一个自定义的Avro文件接收器。

我目前正在尝试使用pyarrow库来编写一个Parquet sink,该sink可以以分布式的方式管理写操作,类似于WriteToParquet PTransform。

class ParquetFileSink(fileio.FileSink):
def __init__(self, schema, codec='deflate'):
self._schema = schema
self._codec = codec
self.writer = None
def open(self, fh):
# This is called on every new bundle.
self.writer = pq.ParquetWriter(
fh,
self._schema,
compression=self._codec,
use_deprecated_int96_timestamps=False
)
def write(self, record):
# This is called on every element.
row = pa.Table.from_pandas(
pd.DataFrame(record), schema=self._schema, preserve_index=False
)
self.writer.write_table(row)
def flush(self):
pass

这里的主要问题是,它是不可能的,据我所知,写无界的PCollections作为Parquet文件,所以如果我试图使用下面的类写记录,要么我得到一个错误写关闭的文件处理程序,或者一些文件根本没有创建。我还尝试使用GroupByKeyPTransform编写批量,但是由于无法关闭pyarrow.parquet.ParquetWriter对象,文件最终仅部分写入并被损坏。此外,这种策略不安全,因为批处理可能非常大,并且将它们作为单个文件写入不是一个好主意。

我可以看到这个问题正在面临的类apache_beam.io.parquetio._ParquetSink,但我不认为这可以直接应用于WriteToFiles类,因为我看不到如何完全管理文件处理程序。

我遇到了一个类似的问题,我最终编写了一个可以与WriteToFiles一起使用的ParquetSink。它会在给定配置的情况下批量处理内存中的记录。我已经使用它来创建依赖于记录中的字段的批处理过程中的动态文件,但我认为它也可以与流管道一起工作,尽管我还没有测试过它。

你可以在这个要点中找到代码

class ParquetSink(fileio.FileSink):
def __init__(self,
file_path_prefix,
schema,
row_group_buffer_size=64 * 1024 * 1024,
record_batch_size=1000,
codec='none',
use_deprecated_int96_timestamps=False,
file_name_suffix='',
num_shards=0,
shard_name_template=None,
mime_type='application/x-parquet'):
self._inner_sink = beam.io.parquetio._create_parquet_sink(
file_path_prefix,
schema,
codec,
row_group_buffer_size,
record_batch_size,
use_deprecated_int96_timestamps,
file_name_suffix,
num_shards,
shard_name_template,
mime_type
)
self._codec = codec
self._schema = schema
self._use_deprecated_int96_timestamps = use_deprecated_int96_timestamps
def open(self, fh):
self._pw = pyarrow.parquet.ParquetWriter(
fh,
self._schema,
compression=self._codec,
use_deprecated_int96_timestamps=self._use_deprecated_int96_timestamps)
def write(self, record):
self._inner_sink.write_record(self._pw, record)
def flush(self):
if len(self._inner_sink._buffer[0]) > 0:
self._inner_sink._flush_buffer()
if self._inner_sink._record_batches_byte_size > 0:
self._inner_sink._write_batches(self._pw)
self._pw.close()

def parquet_compatible_filenaming(suffix=None):
def _inner(window, pane, shard_index, total_shards, compression, destination):
return fileio.destination_prefix_naming(suffix )(
window, pane, shard_index, total_shards, compression, destination).replace(":", ".")
return _inner

def get_parquet_pipeline(pipeline_options, input, output):
with beam.Pipeline(options=pipeline_options) as p:
lines = (p 
| 'Read' >> beam.io.ReadFromParquet(file_pattern=input)
| 'Transform' >> beam.Map(lambda x: { 'some_key': x['some_key'], 'raw': x})
| 'Write to Parquet' >> fileio.WriteToFiles(
path=str(output),
destination=lambda x: x["some_key"],
sink=lambda x: ParquetSink(
file_path_prefix=output,
file_name_suffix=".parquet",
codec="snappy",
schema=pyarrow.schema([
  pyarrow.field("some_key", pyarrow.string()),
  pyarrow.field("raw", pyarrow.string())
])),
file_naming=parquet_compatible_filenaming(suffix=".parquet")
)
)

对parquet格式进行了优化,适合批量写入数据。因此,它不适合流媒体,在流媒体中,你可以一个接一个地接收记录。在您的示例中,您在parquet文件中逐行写入,这是非常低效的。

我建议您将数据保存为适合逐行附加数据的格式,然后定期将这些数据批量移动到parquet文件中。

或者你可以像apache_beam.io.parquetio._ParquetSink。它将记录保存在缓冲区的内存中,并不时地批量写入它们。但是如果你的应用程序崩溃了,你就有可能丢失缓冲区中的记录。

最新更新