Regarding Spark Dataframereader jdbc



我有一个关于Spark Dataframereader机制的问题。如果有人可以帮助我,我将不胜感激。让我在这里解释场景

我正在像这样从 Dstream 创建一个数据帧。这在输入数据中

 var config = new HashMap[String,String]();
        config += ("zookeeper.connect" ->zookeeper);        
        config += ("partition.assignment.strategy" ->"roundrobin");
        config += ("bootstrap.servers" ->broker);
        config += ("serializer.class" -> "kafka.serializer.DefaultEncoder");
        config += ("group.id" -> "default"); 
        val lines =  KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc,config.toMap,Set(topic)).map(_._2)
        lines.foreachRDD { rdd =>
                if(!rdd.isEmpty()){
                    val rddJson = rdd.map { x => MyFunctions.mapToJson(x) }       
                    

                           
                    val sqlContext = SQLContextSingleton.getInstance(ssc.sparkContext)
                    val rddDF = sqlContext.read.json(rddJson)
                    rddDF.registerTempTable("inputData")
            
                   
 val dbDF = ReadDataFrameHelper.readDataFrameHelperFromDB(sqlContext, jdbcUrl, "ABCD","A",numOfPartiton,lowerBound,upperBound)

这是 ReadDataFrameHelper 的代码

def readDataFrameHelperFromDB(sqlContext:HiveContext,jdbcUrl:String,dbTableOrQuery:String,
            columnToPartition:String,numOfPartiton:Int,lowerBound:Int,highBound:Int):DataFrame={
        val jdbcDF = sqlContext.read.jdbc(url = jdbcUrl, table = dbTableOrQuery,
                columnName = columnToPartition,
                lowerBound = lowerBound,
                upperBound = highBound,
                numPartitions = numOfPartiton,
                connectionProperties = new java.util.Properties()
                )
                
            jdbcDF  
    }

最后,我正在做这样的加入

 val joinedData = rddDF.join(dbDF,rddDF("ID") === dbDF("ID")
                                 && rddDF("CODE") === dbDF("CODE"),"left_outer")
                        .drop(dbDF("code"))
                        .drop(dbDF("id"))
                        .drop(dbDF("number"))
                        .drop(dbDF("key"))
                        .drop(dbDF("loaddate"))
                        .drop(dbDF("fid"))
joinedData.show()

我的输入 DStream 将有 1000 行,数据将包含数百万行。因此,当我执行此连接时,将触发从数据库加载所有行并读取这些行,或者这将仅从数据库中读取具有输入DStream code,id的特定行

如 zero323 所指定,我还确认数据将从表中读取完整数据。我检查了数据库会话日志,看到整个数据集正在加载。

谢谢零323

相关内容

  • 没有找到相关文章

最新更新