我是Flink的新手。我正在尝试使用Flink 1.3.2从我们的运动流中读取并将输出写入Cassandra表。并且该程序能够从kinesis流中流传输数据。
问题在于,当我做'cassandrasink.addsink(countsstreaming)'时,它会给我"类型的不匹配"。预期:datastream [notinferredin],实际:datastream [(string,long)]'。我已经浏览了文档和源代码,并注意到addSink将数据截图[在]中。
有人可以帮助我了解"在"类型中是什么,以及如何解决此问题?
预先感谢!
val env = StreamExecutionEnvironment.getExecutionEnvironment
val mapper = new ObjectMapper
val kinesis = env.addSource(new FlinkKinesisConsumer[String](
"kinesis-stream", new SimpleStringSchema, ConsumerConfig))
//DataStream[(String, Long)]
val countsStreaming: DataStream[(String, Long)] = kinesis.map(x => mapper.readValue(x,classOf[java.util.Map[String,String]]))
.map(x => x.get("game_name"))
.map({x => (x,1L) })
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
countsStreaming.print()
CassandraSink.addSink(countsStreaming)
.setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
.setClusterBuilder(new ClusterBuilder() {
override def buildCluster(builder: Cluster.Builder): Cluster = {
builder.addContactPoint("0.0.0.0").build()
}
}).build()
env.execute("StreamingExample")
问题是CassandraSink.addSink
仅接受Java DataStream。
您需要在Scala DataStream之后添加.javaStream
,然后输入不匹配。