使用Apache Spark和kfaka的实时数据库流



我正在用Kafka设计一个spark流应用程序。我有以下几个问题:我正在将RDBMS表中的数据流式传输到kafka中,并使用Spark consumer来消费消息并使用Spark-SQL进行处理

问题:1.我正在从表中流式传输数据,并以(键作为表名,值作为JSON记录形式的表数据)的形式流式传输到kafka——这是正确的体系结构吗?

  1. 在spark consumer中,我正试图使用DStream.foreachRDD(x=>转换为x RDD)来消费数据——我对此有问题(它说不允许转换中的转换出错……我正在尝试提取foreachRDD函数中的键来获取表名,并使用map函数将x.values从JSON转换回普通字符串,然后将每条记录保存到Spark-sql)

这种数据库流的体系结构和设计可以吗?我如何解决转换中的转换问题?

谨致问候,Piyush Kansal

我有一个类似的用例。

我使用Nifi从RDBMS视图中获取数据,并将其放入Kafka Topic中。我为带有多个分区的Oracle数据库中的每个视图都有一个主题。使用Nifi,数据将被转换为JSON格式并放入Kafka中。

是否有任何要求对所有表数据使用相同的kafka主题?

以下代码将用于将数据持久化到Cassandra中。

> val msg = KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder](ssc, kafkaParams, topicsSet)
>   
>     /* Process Records for each RDD */   Holder.log.info("Spark foreach Starts")
>        val data = msg.map(_._2) 
>        data.foreachRDD(rdd =>{
>        if(rdd.toLocalIterator.nonEmpty)    {
>     
>     
>       val messageDfRdd = sqlContext.read.json(rdd)
var data2=messageDfRdd .map(p => employee(p.getLong(1),p.getString(4),p.getString(0),p.getString(2),p.getString(3),p.getString(5)));
>  //code to save to Cassandra.   
>            }

相关内容

  • 没有找到相关文章

最新更新