我试图在windows上使用pyflink连接到postgresql,我使用以下代码:
from pyflink.table import EnvironmentSettings, TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.execute_sql("""
CREATE TABLE test_nifi (
codecountry VARCHAR(50),
name VARCHAR(50),
PRIMARY KEY (codecountry) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://localhost:5432/TestDS',
'table-name' = 'public.test_nifi',
'username' = 'postgres',
'password' = 'postgres'
)
""")
result = table_env.from_path("test_nifi").select("codecountry, name")
print(result.to_pandas())
我得到以下错误:
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
知道为什么会发生这种事吗?
添加以下行:
table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///C:/Users/Admin/Desktop/Flink/flink-connector-jdbc_2.12-1.14.3.jar;file:///C:/Users/Admin/Desktop/Flink/postgresql-42.3.1.jar")
由于Flink是一个基于Java/Scala的项目,无论是连接器还是格式,实现都可以作为jar
pyflink中的postgresql依赖于Java的flink连接器jdbc实现,您需要在stream_execution_environment 中添加此jar
stream_execution_environment.add_jars("file:///my/jar/path/connector1.jar", "file:///my/jar/path/connector2.jar")