我想在Apache Flink中做流媒体工作来做Kafka->闪烁->Apache Flink(Scala(中的HIVE。任何人都可以提供代码样本,因为他们的官方文档不是很清楚。
这应该是流式处理。
关于开始使用Table API的帮助,可以使用Table API进行实时报告。它是用Java编写的,但Scala API并没有太大的不同。
这是一个使用SQL读取Kafka并写入Hive的示例。要在Scala中执行同样的操作,可以使用tableEnv.executeSql(...)
包装SQL语句,如中所示
tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
或
val tableResult1 = tEnv.executeSql("INSERT INTO ...")
如果您需要执行多个插入,那么您需要使用StatementSet
以不同的方式执行。有关详细信息,请参阅下面链接到的文档。
请参阅运行CREATE语句、运行INSERT语句、Apache Kafka SQL连接器和写入配置单元。
如果你陷入困境,向我们展示你所做的尝试以及失败的原因。