Scala/Spark 序列化错误 - 将数据流式传输到 HBASE



我是Scala/Spark的新手。在下面的代码中,我将Twitter公共流内容提取到HBase。
在注释最后四行(将命令放在 HBase 中)时,我可以在终端上打印推文内容,但无法将其转储到 HBase 表中。

我需要以下方面的帮助:
1. 如何克服序列化错误?
2. 是否有有效的方法(可能使用 Kryo 序列化)来克服此错误?

由以下原因引起:java.io.NotSerializableException:
org.apache.hadoop.conf.Configuration Serialization stack:
- 对象不可序列化(类:org.apache.hadoop.conf.Configuration,值:配置: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml)

import twitter4j.auth._
import twitter4j.conf._
import twitter4j._
import twitter4j.json._
import scala.io.Source
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.hadoop.hbase.util.Bytes
import java.io._
import org.apache.spark.streaming.twitter.TwitterUtils
////////////////////////////
val conf = new SparkConf().setAppName("model1").setMaster("local[*]")
// val sc = new SparkContext(conf)
val TABLE_NAME = "publicrd"
val CF_USER = "user"
val CF_TWEET = "tweet"
val CF_ENTITIES = "entities"
val CF_PLACES = "places"
val hadoopConf = new Configuration
val conf = HBaseConfiguration.create(hadoopConf)
val admin = new HBaseAdmin(conf)
val tableDesc = new HTableDescriptor(Bytes.toBytes(TABLE_NAME))
// Define column family descriptor
val ColumnFamilyDesc1 = new HColumnDescriptor(Bytes.toBytes(CF_USER))
val ColumnFamilyDesc2 = new HColumnDescriptor(Bytes.toBytes(CF_TWEET))
val ColumnFamilyDesc3 = new HColumnDescriptor(Bytes.toBytes(CF_ENTITIES))
val ColumnFamilyDesc4 = new HColumnDescriptor(Bytes.toBytes(CF_PLACES))
// Add column family in table descriptor
tableDesc.addFamily(ColumnFamilyDesc1)
tableDesc.addFamily(ColumnFamilyDesc2)
tableDesc.addFamily(ColumnFamilyDesc3)
tableDesc.addFamily(ColumnFamilyDesc4)
// Check if the table exists
if (admin.tableExists(TABLE_NAME)){
print(">>>>>" + TABLE_NAME + " already exists <<<<<")
admin.disableTable(TABLE_NAME)
admin.deleteTable(TABLE_NAME)
}
// Create HBASE table
admin.createTable(tableDesc)
val table = new HTable(conf, TABLE_NAME)
/////
val timewindow = 2 // seconds
val ssc = new StreamingContext(sc, Seconds(timewindow))
val cb = new ConfigurationBuilder
val ckey = "ckey"
val csecret = "csecret"
val atoken = "atoken"
val atokensecret = "atokensecret"
cb.setDebugEnabled(true).
setOAuthConsumerKey(ckey).
setOAuthConsumerSecret(csecret).
setOAuthAccessToken(atoken).
setOAuthAccessTokenSecret(atokensecret).
setJSONStoreEnabled(true)
val auth = new OAuthAuthorization(cb.build)
val tweets = TwitterUtils.createStream(ssc,Some(auth)) 
val status = tweets.filter(_.getLang()=="en")
status.foreachRDD(foreachFunc = rdd => {
    rdd.foreachPartition {
    records => while (records.hasNext) {
        var record = records.next
        print("nn>>>>"+record)
        var tweetID = record.getUser().getId().toString//.isInstanceOf[Int]
        print("ntweetID : "+tweetID)
        var tweetBody = record.getText()//.toString
        print("ntweetBody : "+tweetBody)
        var favoritesCount = record.getFavoriteCount()//.toInt
        print("nfavoritesCount : "+favoritesCount)
        var keyrow = "t_"+tweetID //"t_"+
        print("nkeyrow : "+keyrow+"n")
        var theput= new Put(Bytes.toBytes(keyrow))
        theput.add(Bytes.toBytes(CF_TWEET),Bytes.toBytes("tweetid"),Bytes.toBytes(tweetID)) 
        theput.add(Bytes.toBytes(CF_TWEET),Bytes.toBytes("tweetid"),Bytes.toBytes(tweetBody))
        theput.add(Bytes.toBytes(CF_USER),Bytes.toBytes("tweetid"),Bytes.toBytes(favoritesCount))
        table.put(theput)
        }
    }
}
)

代码通过以下方式在终端上运行:

spark-shell --driver-class-path /opt/hadoop/hbase-1.2.1/lib/hbase-server-1.1.4.jar:/opt/hadoop/hbase-1.2.1/lib/hbase-protocol-1.0.0-cdh5.5.0.jar:/opt/hadoop/hbase-1.2.1/lib/hbase-hadoop2-compat-1.0.0-cdh5.5.0.jar:/opt/hadoop/hbase-1.2.1/lib/hbase-client-1.0.0-cdh5.5.0.jar:/opt/hadoop/hbase-1.2.1/lib/hbase-common-1.0.0-cdh5.5.0.jar:/opt/hadoop/hbase-1.2.1/lib/htrace-core-3.2.0-incubating.jar:/home/cloudera/Desktop/hbase/twitter4jJARS/guava-19.0.jar:/home/cloudera/Desktop/hbase/twitter4jJARS/spark-streaming-twitter_2.10-1.6.1.jar:/home/cloudera/Desktop/hbase/twitter4jJARS/twitter4j-async-4.0.4.jar:/home/cloudera/Desktop/hbase/twitter4jJARS/twitter4j-core-4.0.4.jar:/home/cloudera/Desktop/hbase/twitter4jJARS/twitter4j-examples-4.0.4.jar:/home/cloudera/Desktop/hbase/twitter4jJARS/twitter4j-media-support-4.0.4.jar:/home/cloudera/Desktop/hbase/twitter4jJARS/twitter4j-stream-4.0.4.jar

它说对象org.apache.hadoop.conf.Configuration不可序列化,这意味着它在需要时不会实现Serializable接口。要摆脱它,请添加@transient关键字。

@transient val hadoopConf = new Configuration

最新更新