Python程序使用Elasticsearch作为Apache Flink中的接收器



我正试图从kafka主题中读取数据,进行一些处理并将数据转储到elasticsearch中。但我在python中找不到使用Elastisearch作为水槽的例子。有人能帮我做一个同样的片段吗。

# add kafka connector dependency
kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),
'flink-sql-connector-kafka_2.11-1.14.0.jar')

tbl_env.get_config()
.get_configuration()
.set_string("pipeline.jars", "file://{}".format(kafka_jar))

以下是错误。。

Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Available factory identifiers are:
blackhole
datagen
filesystem
print
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:399)
at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:583)
... 31 more

请参阅https://nightlies.apache.org/flink/flink-docs-release-1.14/api/python/pyflink.datastream.html#pyflink.datastream.connectors.JdbcSink

kafka到mysql

import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes
from pyflink.table.udf import udf, TableFunction, ScalarFunction

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(
env,
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
sourceKafkaDdl = """
create table sourceKafka(
ID varchar comment '',
TRUCK_ID varchar comment '',
SPEED varchar comment '',
GPS_TIME varchar comment ''
)comment 'get from kafka' 
with(
'connector' = 'kafka',
'topic' = 'pyflink_test',        
'properties.bootstrap.servers' = '***:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)
"""
mysqlSinkDdl = """
CREATE TABLE mysqlSink (
id varchar, 
truck_id varchar
) 
with (
'connector.type' = 'jdbc',  
'connector.url' = 'jdbc:mysql://***:***/test?autoReconnect=true&failOverReadOnly=false&useUnicode=true&characterEncoding=utf-8&useSSL=false' ,
'connector.username' = '**' ,
'connector.password' = '**', 
'connector.table' = 'mysqlsink' ,
'connector.driver' = 'com.mysql.cj.jdbc.Driver' ,
'connector.write.flush.interval' = '5s', 
'connector.write.flush.max-rows' = '1'
)
"""
t_env.execute_sql(sourceKafkaDdl)
t_env.execute_sql(mysqlSinkDdl)
t_env.from_path('sourceKafka')
.select("ID,TRUCK_ID")
.insert_into("mysqlSink")
t_env.execute("pyFlink_mysql")

相关内容

  • 没有找到相关文章

最新更新