如何在Spark过滤器功能中收集或存储过滤出来的json



我想存储或收集过滤出的数据,即对HDFS或hbase验证失败的json。

dstream.filter { data => VitalValidator.isVitalJSONValid(data) }

其中dstream为dstream [String], isVitalJSONValid接受字符串并返回布尔值

我会用Scala做类似的事情。

def isVitalJSONValid(data: String): Boolean = {
  var isValid = false
  //peroforms some validation 
  if(data.equals("some/validation")){
    isValid = true
  }
  !isValid
}

//existing goes on here
dstream.filter(data => isVitalJSONValid(data)).saveAsHadoopFiles("file_prefix")

dstreams的输出操作

最新更新