我试图通过使用Python SDK io连接到云SQL。jdbc模块,更具体地说是ReadFromJdbc类,在这里有文档- https://beam.apache.org/releases/pydoc/current/apache_beam.io.jdbc.html
基于它和使用JDBC连接到Cloud MySQL的信息- https://github.com/GoogleCloudPlatform/cloud-sql-jdbc-socket-factory/blob/main/docs/jdbc-mysql.md,我编写了以下代码
import apache_beam as beam
import apache_beam.io.jdbc as jdbc
import typing
import apache_beam.coders as coders
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = {
'project': 'project-name',
'runner': 'DataflowRunner',
'region': 'europe-central2',
'staging_location':"gs://temp",
'temp_location':"gs://temp",
'template_location':"gs://templates/temp_name"
}
pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
serviceAccount = r'pathtoserviceaccount.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = serviceAccount
ExampleRow = typing.NamedTuple('ExampleRow',
[('id', int), ('migration', str)])
coders.registry.register_coder(ExampleRow, coders.RowCoder)
with beam.Pipeline(options=pipeline_options) as p:
res = (
p
| "Read database list" >> jdbc.ReadFromJdbc(
table_name='table',
driver_class_name='com.mysql.jdbc.Driver',
jdbc_url='jdbc:mysql:///<DATABASE_NAME>?cloudSqlInstance=<INSTANCE_CONNECTION_NAME>&socketFactory=com.google.cloud.sql.mysql.SocketFactory&user=<MYSQL_USER_NAME>&password=<MYSQL_USER_PASSWORD>',
username='user',
password='pass',
query = "select id, migration from db.table;",
fetch_size=1,
classpath=["com.google.cloud.sql:mysql-socket-factory-connector-j-8:1.7.2"],
expansion_service = 'host:6666'
)
| "Print results" >> beam.io.WriteToText(r'gs://output/out.csv')
)
对于扩展服务,我已经按照这里的文档设置了WLS2 python环境- https://beam.apache.org/documentation/sdks/java-multi-language-pipelines/#advanced-start-an-expansion-service
不幸的是,我得到了这个错误:
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "failed to connect to all addresses; last error: UNAVAILABLE: ipv4:127.0.0.1:6666: WSA Error"
debug_error_string = "UNKNOWN:failed to connect to all addresses; last error: UNAVAILABLE: ipv4:127.0.0.1:6666: WSA Error {grpc_status:14, created_time:"2022-12-08T15:43:05.445755053+00:00"}"
我试图将expansion_service
切换到我从wls hostname -I
获得的特定IP,但它产生了相同的结果,即使您可以到达它(使用ping测试并托管web服务器)。
我做错了什么吗?我发现很难相信连接到云SQL是如此困难,所以我必须…
apache_beam.io.jdbc
模块下的转换是在Beam Java SDK中实现的跨语言转换。因此,在管道构建期间,Python SDK将连接到Java expansion service
以扩展这些转换。您按照说明创建了Python expansion service
.
我认为最简单的方法就是使用默认的扩展服务。
- 首先,在构建管道的计算机上安装Java运行时,并确保
java
命令可用。 - 使用以下转换从Cloud SQL读取,
p | "Read database list" >> jdbc.ReadFromJdbc(
table_name='table',
driver_class_name='com.mysql.jdbc.Driver',
jdbc_url='jdbc:mysql:///<DATABASE_NAME>?cloudSqlInstance=<INSTANCE_CONNECTION_NAME>&socketFactory=com.google.cloud.sql.mysql.SocketFactory&user=<MYSQL_USER_NAME>&password=<MYSQL_USER_PASSWORD>',
username='user',
password='pass',
query = "select id, migration from db.table;",
fetch_size=1,
classpath=["com.google.cloud.sql:mysql-socket-factory-connector-j-8:1.7.2"]
)