Writing to HBase from Spark Streaming



我在Spark Streaming(使用Scala)中有一个由(键,值)对组成的文件输入,如果键满足特定条件,我需要做的是将值存储在HBase中。因为我有:

val pair: DStream[(String, String)]

我试图做的是在一个映射中设置一个条件,然后尝试在HBase:中插入值

pair.map(x => {
if (x._1 == "condition")
{ val hconf = HBaseConfiguration.create()
val hTable = new HTable(hconf, "mytab")
val thePut = new Put(Bytes.toBytes(1))
thePut.add(Bytes.toBytes("colfamily"), Bytes.toBytes("c1"), Bytes.toBytes(x._2)
hTable.put(thePut)
})
}

然而,这不起作用,我在使用spark-submit时收到一个错误:没有注册任何输出操作,因此无法执行

这是我能想到的将值插入HBase的唯一方法,我做错了什么吗?你能帮我修一下吗?

这是更新后的代码:

pair.foreachRDD(rdd => rdd.map( p =>
{val hconf = HBaseConfiguration.create()
 val hTable = new HTable(hconf,"mytab")
 val thePut = new Put(Bytes.toBytes(1))
 thePut.add(Bytes.toBytes("colfamily"), Bytes.toBytes(p._1), Bytes.toBytes(p._2)
 hTable.put(thePut)
})

当我使用Spark-submit运行它时,我收到一个错误,说"任务不可序列化",你知道这意味着什么吗?我该如何修复它?

提前感谢

Jean,

收到错误消息的原因是您在代码中缺少对名为的RDD的操作。

请参阅以下更正的代码。

  pair.foreachRDD((rdd : [RDD(String,String)]) => {  
 val newRdd = rdd.map(p=> (p._1,p._2))
        if (mewRdd._1 == "condition")
        {
         /* Your code*/
        }})

要在hbase中插入来自spark流的数据,请参阅本文https://www.mapr.com/blog/spark-streaming-hbase我希望这能有所帮助。

最新更新