映射功能写入全局火花rdd



我有一个字符串的RDD。每行对应各种日志。

我在单个函数中有多个正则表达式,这些正则表达式与 RDD 的行匹配/大小写以应用适应的正则表达式。

我想在我的RDD上映射这个独特的函数,这样它就可以快速处理每一行,并将处理的每一行存储在另一个全局rdd中。

问题是,由于我希望此任务并行化,因此我的全局RDD必须可同时访问以添加每个已处理的行。

我想知道是否有其他方法可以做到这一点或任何事情!我希望提高我的火花技能。

例如,这就是我想做的:

我有一个像这样的txt:

错误:哈param_error=8 param_err2=https

警告 : 呼呼呼呼 param_warn=丘 param_warn2=无线网络

我的正则表达式函数会将包含"ERROR"的行与数组匹配,例如Array("Error","8","https")

另一个正则表达式函数会将包含"WARNING"的行与数组进行匹配,例如Array("Warning","tchu","wifi")

最后,我想为处理的每一行获取一个RDD[Array[String]]

如何保持它与 Spark 并行?

首先,重要的是要明白Spark中没有比"全局RDD"更好的东西,也没有理由需要这样的东西。使用Spark时,你应该考虑将一个RDD转换为另一个RDD,而不是更新RDD(这是不可能的 - RDD是不可变的)。每个这样的转换都将由Spark分布式(并行)执行。

在这种情况下,如果我正确理解您的要求,您希望将每条记录map为以下结果之一:

  • 第一项"ERROR"Array[String],或:
  • 一项"WARNING"Array[String],或:
  • 如果没有与记录匹配的模式,请将其删除

为此,您可以使用RDDmap(f)collect(f)方法:

// Sample data:
val rdd = sc.parallelize(Seq(
"ERROR : Hahhaha param_error=8 param_err2=https",
"WARNING : HUHUHUHUH param_warn=tchu param_warn2=wifi",
"Garbage - not matching anything"
))
// First we can split in " : " to easily identify ERROR vs. WARNING 
val splitPrefix = rdd.map(line => line.split(" : "))
// Implement these parsing functions as you see fit; 
// The input would be the part following the " : ", 
// and the output should be a list of the values (not including the ERROR / WARNING) 
def parseError(v: String): List[String] = ??? // example input: "Hahhaha param_error=8 param_err2=https"
def parseWarning(v: String): List[String] = ??? // example input: "HUHUHUHUH param_warn=tchu param_warn2=wifi"
// Now we can use these functions in a pattern-matching function passed to RDD.collect,
// which will transform each value that matches one of the cases, and will filter out 
// values that don't match anything
val result: RDD[List[String]] = splitPrefix.collect {
case Array(l @ "ERROR", v) => l :: parseError(v)
case Array(l @ "WARNING", v) => l :: parseWarning(v)
// NOT adding a default case, so records that didn't match will be removed
}    
// If you really want Array[String] and not List[String]:    
val arraysRdd: RDD[Array[String]] = result.map(_.toArray)

最新更新