我想从在谷歌数据流上运行的Apache Beam Pipeline连接Google Cloud sql postgres实例。
我想使用Python SDK来做到这一点。
我无法为此找到适当的文档。
在云SQL如何指导我没有看到任何数据流的文档。
https://cloud.google.com/sql/docs/postgres/
有人可以提供文档链接/github示例吗?
relational_db。写和relational_db。从光束块读取变换,如下所示:
首先安装光束块:
pip install beam-nuggets
阅读:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db
with beam.Pipeline(options=PipelineOptions()) as p:
source_config = relational_db.SourceConfiguration(
drivername='postgresql+pg8000',
host='localhost',
port=5432,
username='postgres',
password='password',
database='calendar',
)
records = p | "Reading records from db" >> relational_db.Read(
source_config=source_config,
table_name='months',
)
records | 'Writing to stdout' >> beam.Map(print)
对于写作:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db
with beam.Pipeline(options=PipelineOptions()) as p:
months = p | "Reading month records" >> beam.Create([
{'name': 'Jan', 'num': 1},
{'name': 'Feb', 'num': 2},
])
source_config = relational_db.SourceConfiguration(
drivername='postgresql+pg8000',
host='localhost',
port=5432,
username='postgres',
password='password',
database='calendar',
create_if_missing=True,
)
table_config = relational_db.TableConfiguration(
name='months',
create_if_missing=True
)
months | 'Writing to DB' >> relational_db.Write(
source_config=source_config,
table_config=table_config
)
SDK包含JdbcIO,它允许连接到可以通过标准Java JDBC机制访问的任何数据库。目前在Beam Python SDK中没有类似物。如果有的话,我想它会使用Python DB-API。随意提交功能请求或贡献 - 开发起来应该相当简单(例如,通过模仿 Java JdbcIO
的源代码),但非常有用:)