我是spark-Cassandra和Scala的新手。我有一个现有的RDD。让我们说:
((url_hash,网址,created_timestamp((。
我想根据url_hash过滤这个RDD。如果 Cassandra 表中存在url_hash,那么我想从 RDD 中过滤掉它,这样我只能对新的 url 进行处理。
Cassandra 表如下所示:
url_hash| url | created_timestamp | updated_timestamp
任何指针都会很棒。
我尝试了这样的事情:
case class UrlInfoT(url_sha256: String, full_url: String, created_ts: Date)
def timestamp = new java.utils.Date()
val rdd1 = rdd.map(row => (calcSHA256(row(1)), (row(1), timestamp)))
val rdd2 = sc.cassandraTable[UrlInfoT]("keyspace", "url_info").select("url_sha256", "full_url", "created_ts")
val rdd3 = rdd2.map(row => (row.url_sha256,(row.full_url, row.created_ts)))
newUrlsRDD = rdd1.subtractByKey(rdd3)
我收到卡桑德拉错误
java.lang.NullPointerException: Unexpected null value of column full_url in keyspace.url_info.If you want to receive null values from Cassandra, please wrap the column type into Option or use JavaBeanColumnMapper
卡桑德拉表中没有空值
感谢原型保罗!
我希望有人觉得这有用。必须将选项添加到案例类中。
期待更好的解决方案
case class UrlInfoT(url_sha256: String, full_url: Option[String], created_ts: Option[Date])
def timestamp = new java.utils.Date()
val rdd1 = rdd.map(row => (calcSHA256(row(1)), (row(1), timestamp)))
val rdd2 = sc.cassandraTable[UrlInfoT]("keyspace", "url_info").select("url_sha256", "full_url", "created_ts")
val rdd3 = rdd2.map(row => (row.url_sha256,(row.full_url, row.created_ts)))
newUrlsRDD = rdd1.subtractByKey(rdd3)