我是Flink的新手,需要通过Flink将Json数据从Kafka流式传输到MySQL表。无法在外部 MYSQL 实例中传输数据。这是我尝试过的。任何帮助将不胜感激。 该字段包括时间戳:字符串,商家
ID:字符串,cid:字符串,uid:字符串,会话 ID:字符串,事件:字符串,事件类型:字符串,ip:字符串,refUrl:字符串,引用:字符串val env = StreamExecutionEnvironment.createLocalEnvironment()
val tEnv = TableEnvironment.getTableEnvironment(env)
val prop = new Properties()
prop.setProperty("zookeeper.connect","")
prop.setProperty("bootstrap.servers", "localhost:9092")
prop.setProperty("group.id", "test")
prop.setProperty("auto.offset.reset", "earliest")
val output: DataStream[String] = env.addSource(new FlinkKafkaConsumer011[String]("TQBQTEST_USER_AUG_2018", new serialization.SimpleStringSchema(), prop))
val finalTable = tEnv.registerDataStream("User",output)
env.execute()
几点:
(1( 您可能需要升级到 Flink 1.6,因为此版本中对将 Flink 的 Table/SQL API 连接到外部系统的支持得到了显著增强。
(2( 如果使用适当的 Kafka 表源和 JDBC 表接收器,则会更轻松。您可以在此处找到版本 1.6 的文档,在此处找到版本 1.5 的文档。