无法使用 Apache Beam 将数据写入 Postgres



我正在尝试使用Beam读取csv并将数据发送到postgres。

但由于转换不匹配,管道出现故障。请注意,当2列的类型为int时,该管道可以工作,而当列的类型包含字符串时,管道将失败。

这是我尝试过的一件事。

from past.builtins import unicode
ExampleRow = typing.NamedTuple('ExampleRow',[('id',int),('name',unicode)])
beam_df = (pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('path.csv').with_output_types(ExampleRow))
beam_df2 = (convert.to_pcollection(beam_df) | beam.Map(print) |
WriteToJdbc(
table_name=table_name,
jdbc_url=jdbc_url,
driver_class_name = 'org.postgresql.Driver',
statement="insert into tablr values(?,?);",
username=username,
password=password,
)
)
result = pipeline.run()
result.wait_until_finish()

我还尝试添加一个urn来将str-python类型转换为varchar或unicode,但这似乎也不适用于

from apache_beam.typehints.schemas import LogicalType
@LogicalType.register_logical_type
class db_str(LogicalType):
@classmethod
def urn(cls):
return "beam:logical_type:javasdk:v1"
@classmethod
def language_type(cls):
return unicode
def to_language_type(self, value):
return unicode(value)
def to_representation_type(self, value):
return unicode(value)

附加:这是打印结果:

BeamSchema_f0d95d64_95c7_43ba_8a04_ac6a0b7352d9(id=21, nom='nom21')
BeamSchema_f0d95d64_95c7_43ba_8a04_ac6a0b7352d9(id=22, nom='nom22')
BeamSchema_f0d95d64_95c7_43ba_8a04_ac6a0b7352d9(id=21, nom='nom21')
BeamSchema_f0d95d64_95c7_43ba_8a04_ac6a0b7352d9(id=22, nom='nom22')

问题来自WriteToJdbc函数和"nom"列。

知道怎么做吗?

我认为您的问题是由于您的输出PCollection中有BeamSchema结构,而不是预期的NamedTuple

此外,根据文档,您的示例coders.registry.register_coder(ExampleRow, coders.RowCoder):中缺少一条代码指令

apache_beam_io_jdbc文档

ExampleRow = typing.NamedTuple('ExampleRow',
[('id', int), ('name', unicode)])
coders.registry.register_coder(ExampleRow, coders.RowCoder)
with TestPipeline() as p:
_ = (
p
| beam.Create([ExampleRow(1, 'abc')])
.with_output_types(ExampleRow)
| 'Write to jdbc' >> WriteToJdbc(
driver_class_name='org.postgresql.Driver',
jdbc_url='jdbc:postgresql://localhost:5432/example',
username='postgres',
password='postgres',
statement='INSERT INTO example_table VALUES(?, ?)',
))

您的代码段中的以下代码没有按预期设置ExampleRowNamedTuple,因为您的print指示类型为BeamSchema:

beam_df = (pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('path.csv').with_output_types(ExampleRow))

尝试使用此代码之前:

from past.builtins import unicode
ExampleRow = typing.NamedTuple('ExampleRow',[('id',int),('name',unicode)])
# Register coder here
coders.registry.register_coder(ExampleRow, coders.RowCoder)
beam_df = (pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('path.csv').with_output_types(ExampleRow))

如果它不起作用,您需要找到一种方法,在DoFn:中将BeamSchema转换为ExampleRowNamedTuple

def convert_beam_schema_to_named_tuple(beam_schema) -> ExampleRow :
# Your logic to transform Beam Schema to ExampleRow NamedTuple
from past.builtins import unicode
ExampleRow = typing.NamedTuple('ExampleRow',[('id',int),('name',unicode)])
beam_df = (pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('path.csv').with_output_types(ExampleRow))
beam_df2 = (convert.to_pcollection(beam_df) 
| beam.Map(print) 
| beam.Map(convert_beam_schema_to_named_tuple) 
| WriteToJdbc(
table_name=table_name,
jdbc_url=jdbc_url,
driver_class_name = 'org.postgresql.Driver',
statement="insert into tablr values(?,?);",
username=username,
password=password,
)
)
result = pipeline.run()
result.wait_until_finish()

最后,如果您对Beam模式和Beam数据帧有问题,您可以直接使用BeamIO读取CSV文件,并使用PCollection

您可以查看此链接:csv-into-dictionary-in-apache-beam

最新更新