我在Spark Streaming(使用Scala)中有一个由(键,值)对组成的文件输入,如果键满足特定条件,我需要做的是将值存储在HBase中。因为我有:
val pair: DStream[(String, String)]
我试图做的是在一个映射中设置一个条件,然后尝试在HBase:中插入值
pair.map(x => {
if (x._1 == "condition")
{ val hconf = HBaseConfiguration.create()
val hTable = new HTable(hconf, "mytab")
val thePut = new Put(Bytes.toBytes(1))
thePut.add(Bytes.toBytes("colfamily"), Bytes.toBytes("c1"), Bytes.toBytes(x._2)
hTable.put(thePut)
})
}
然而,这不起作用,我在使用spark-submit时收到一个错误:没有注册任何输出操作,因此无法执行
这是我能想到的将值插入HBase的唯一方法,我做错了什么吗?你能帮我修一下吗?
这是更新后的代码:
pair.foreachRDD(rdd => rdd.map( p =>
{val hconf = HBaseConfiguration.create()
val hTable = new HTable(hconf,"mytab")
val thePut = new Put(Bytes.toBytes(1))
thePut.add(Bytes.toBytes("colfamily"), Bytes.toBytes(p._1), Bytes.toBytes(p._2)
hTable.put(thePut)
})
当我使用Spark-submit运行它时,我收到一个错误,说"任务不可序列化",你知道这意味着什么吗?我该如何修复它?
提前感谢
Jean,
收到错误消息的原因是您在代码中缺少对名为对的RDD的操作。
请参阅以下更正的代码。
pair.foreachRDD((rdd : [RDD(String,String)]) => {
val newRdd = rdd.map(p=> (p._1,p._2))
if (mewRdd._1 == "condition")
{
/* Your code*/
}})
要在hbase中插入来自spark流的数据,请参阅本文https://www.mapr.com/blog/spark-streaming-hbase我希望这能有所帮助。