我正在用Kafka设计一个spark流应用程序。我有以下几个问题:我正在将RDBMS表中的数据流式传输到kafka中,并使用Spark consumer来消费消息并使用Spark-SQL进行处理
问题:1.我正在从表中流式传输数据,并以(键作为表名,值作为JSON记录形式的表数据)的形式流式传输到kafka——这是正确的体系结构吗?
- 在spark consumer中,我正试图使用DStream.foreachRDD(x=>转换为x RDD)来消费数据——我对此有问题(它说不允许转换中的转换出错……我正在尝试提取foreachRDD函数中的键来获取表名,并使用map函数将x.values从JSON转换回普通字符串,然后将每条记录保存到Spark-sql)
这种数据库流的体系结构和设计可以吗?我如何解决转换中的转换问题?
谨致问候,Piyush Kansal
我有一个类似的用例。
我使用Nifi从RDBMS视图中获取数据,并将其放入Kafka Topic中。我为带有多个分区的Oracle数据库中的每个视图都有一个主题。使用Nifi,数据将被转换为JSON格式并放入Kafka中。
是否有任何要求对所有表数据使用相同的kafka主题?
以下代码将用于将数据持久化到Cassandra中。
> val msg = KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder](ssc, kafkaParams, topicsSet)
>
> /* Process Records for each RDD */ Holder.log.info("Spark foreach Starts")
> val data = msg.map(_._2)
> data.foreachRDD(rdd =>{
> if(rdd.toLocalIterator.nonEmpty) {
>
>
> val messageDfRdd = sqlContext.read.json(rdd)
var data2=messageDfRdd .map(p => employee(p.getLong(1),p.getString(4),p.getString(0),p.getString(2),p.getString(3),p.getString(5)));
> //code to save to Cassandra.
> }