我有一个从HBase转换过来的RDD:
val hbaseRDD: RDD[(String, Array[String])]_1是键。和数组为HBase中的值。
4929101-ACTIVE, ["4929101","2015-05-20 10:02:44","dummy1","dummy2"]
4929102-ACTIVE, ["4929102","2015-05-20 10:02:44","dummy1","dummy2"]
4929103-ACTIVE, ["4929103","2015-05-20 10:02:44","dummy1","dummy2"]
我还将SchemaRDD (id,date1,col1,col2,col3)转换为
val refDataRDD: RDD[(String, Array[String])],我将迭代并检查它是否存在于hbaseRDD:
4929103, ["2015-05-21 10:03:44","EV01","col2","col3"]
4929104, ["2015-05-21 10:03:44","EV02","col2","col3"]
问题是,
我如何检查一个键(tuple._1)/("4929103")是否存在于hbaseRDD并获得相应的值(tuple._2)?-我不能在rdd内使用PairRDD的查找函数。过滤器,它抛出"scala。MatchError: null",但它在
之外工作val filteredRDD = rdd.filter(sqlRow => { val hbaseLookup = hbaseRDD.lookup(sqlRow(0).toString + "-ACTIVE") // if found, check if date1 of hbaseRDD < sqlRow(1) // else if not found, retain row true })
我不确定这是否是问题所在,因为当我将查找行切换到:
时,我也遇到了NPE。val sqlRowHbase = hbaseRDD.filter(row => {
注意:我正在做一个hbaseRDD。在这些行之前数。和hbaseRDD。在rdd.filter
之外查找工作正常
所以基本上,我试图通过hbaseRDD中的键"找到"并获得行/值。连接它们有点复杂,因为两个rdd中的一些值可能为空。这取决于很多情况哪一行应该保留什么数据
假设您需要查找的a_id集包含在RDD中,我认为您可以使用leftOuterJoin而不是迭代并查找每个值。
我看到了你上面关于date1可能改变位置的评论。我不解决它下面虽然,我认为这应该处理之前的查找本身的某种特定的映射的每一行。
如果我得到正确的伪代码,你有(id, date)
的RDD,并希望通过在hbase中查找数据来更新它,如果在hbase中找到一行这个id,并且如果它的日期早于refData中的日期,则更新它。对吗?
如果是,假设你有一些像这样的ref数据:
val refData = sc.parallelize(Array(
("4929103","2015-05-21 10:03:44"),
("4929104","2015-05-21 10:03:44")
))
和一些来自Hbase的行数据:
val hbaseRDD = sc.parallelize(Array(
("4929101-ACTIVE", Array("4929101","2015-05-20 10:02:44")),
("4929102-ACTIVE", Array("4929102","2015-05-20 10:02:44")),
("4929103-ACTIVE", Array("4929103","2015-05-20 10:02:44"))
))
然后你可以用一个简单的leftOuterJoin从refData中查找每个id到hbase中,并且对于找到的每一行:如果需要的话更新日期:
refData
// looks up in Hbase all rows whose date1 a_id value matches the id in searchedIds
.leftOuterJoin(hbaseRDD.map{ case (rowkey, Array(a_id, date1)) => (a_id, date1)})
// update the date in refData if date from hBase is earlier
.map { case (rowKey, (refDate, maybeRowDate)) => ( rowKey, chooseDate (refDate, maybeRowDate)) }
.collect
def chooseDate(refDate: String, rowDate: Option[String]) = rowDate match {
// if row not found in Hbase: keep ref date
case None => refDate
case Some(rDate) =>
if (true) /* replace this by first parsing the date, then check if rowDate < refDate */
rowDate
else
refDate
}