来自RDD Scala Spark的嵌套数据



我的示例数据如下

{ Line 1
Line 2
Line 3
Line 4
...
...
...
Line 6

Complete info:
Dept : HR
Emp name is Andrew lives in Colorodo
DOB : 03/09/1958
Project name : Healthcare
DOJ : 06/04/2011
DOL : 09/21/2011
Project name : Retail
DOJ : 11/04/2011
DOL : 08/21/2013
Project name : Audit
DOJ : 09/11/2013
DOL : 09/01/2014
Project name : ContorlManagement
DOJ : 01/08/2015
DOL : 02/14/2016
Emp name is Alex lives in Texas
DOB : 03/09/1958
Project name : Healthcare
DOJ : 06/04/2011
DOL : 09/21/2011
Project name : ContorlManagement
DOJ : 01/08/2015
DOL : 02/14/2016
Emp name is Mathew lives in California
DOB : 03/09/1958
Project name : Healthcare
DOJ : 06/04/2011
DOL : 09/21/2011
Project name : Retail
DOJ : 11/04/2011
DOL : 08/21/2013
Project name : Audit
DOJ : 09/11/2013
DOL : 09/01/2014
Project name : ContorlManagement
DOJ : 01/08/2015
DOL : 02/14/2016
Dept : QC
Emp name is Nguyen lives in Nevada
DOB : 03/09/1958
Project name : Healthcare
DOJ : 06/04/2011
DOL : 09/21/2011
Project name : Retail
DOJ : 11/04/2011
DOL : 08/21/2013
Project name : Audit
DOJ : 09/11/2013
DOL : 09/01/2014
Project name : ContorlManagement
DOJ : 01/08/2015
DOL : 02/14/2016
Emp name is Cassey lives in Newyork
DOB : 03/09/1958
Project name : Healthcare
DOJ : 06/04/2011
DOL : 09/21/2011
Project name : ContorlManagement
DOJ : 01/08/2015
DOL : 02/14/2016
Emp name is Ronney lives in Alasca
DOB : 03/09/1958
Project name : Audit
DOJ : 09/11/2013
DOL : 09/01/2014
Project name : ContorlManagement
DOJ : 01/08/2015
DOL : 02/14/2016

line21
line22
line23
...
}

输出我需要;

{
Dept    Empname     State       Dob     Projectname         DOJ     DOE
HR  Andrew      Colorodo    03/09/1958  Healthcare      06/04/2011  09/21/2011
HR  Andrew      Colorodo    03/09/1958  Retail          11/04/2011  08/21/2013
HR  Andrew      Colorodo    03/09/1958  Audit           09/11/2013  09/01/2014
HR  Andrew      Colorodo    03/09/1958  ControlManagement   06/04/2011  09/21/2011
HR  Alex        Texas       03/09/1958  Healthcare      06/04/2011  09/21/2011
HR  Alex        Texas       03/09/1958  ControlManagement   06/04/2011  09/21/2011
HR  Mathews     California  03/09/1958  Healthcare      06/04/2011  09/21/2011
HR  Mathews     California  03/09/1958  Retail          11/04/2011  08/21/2013
HR  Mathews     California  03/09/1958  Audit           09/11/2013  09/01/2014
HR  Mathews     California  03/09/1958  ControlManagement   06/04/2011  09/21/2011
QC  Nguyen      Nevada      03/09/1958  Healthcare      06/04/2011  09/21/2011
QC  Nguyen      Nevada      03/09/1958  Retail          11/04/2011  08/21/2013
QC  Nguyen      Nevada      03/09/1958  Audit           09/11/2013  09/01/2014
QC  Nguyen      Nevada      03/09/1958  ControlManagement   06/04/2011  09/21/2011
QC  Casey       Newyork     03/09/1958  Healthcare      06/04/2011  09/21/2011
QC  Casey       Newyork     03/09/1958  Retail          11/04/2011  08/21/2013
QC  Casey       Newyork     03/09/1958  Audit           09/11/2013  09/01/2014
QC  Casey       Newyork     03/09/1958  ControlManagement   06/04/2011  09/21/2011}

我尝试了以下选项:1)然后想在地图内使用地图进行匹配。有很多错误。然后阅读此处的帖子,该帖子解释了我的地图不能有另一个地图。实际上,在另一个内部无法进行RDD转换。对不起。新手要火花。

2)尝试使用reg表达式。然后在捕获的组上拨打地图。但是,由于每个部门都有多个EMP,并且每个员工都有多个项目信息,因此我无法反复分组该部分数据,也无法与相应的员工映射。员工和部门的详细信息也是如此。

Q1:甚至可以将以上示例数据转换为Spark/Scala中的以上数据格式。?

Q2:如果是这样,我会追求的逻辑/概念?

预先感谢。

Q1:是否可以使用SPARK转换此类嵌套数据格式?

A:是的。如果记录更详细的记录,我建议使用此问题中讨论的多行方法:如何处理Spark

中的多行输入记录

但是,鉴于在数据"部门"中包含大量数据中,我不建议它。

Q2:我应该追随什么逻辑/概念?

a2:使用基于迭代器或基于流的实现,可以更好地接近这种线性处理,在我们穿越线路时正在构建状态:

可以更好地接近:

我们每行消耗行,只有在完成这些记录时才能产生记录。上下文保存在某些状态。使用这种方法,文件确实无关紧要,因为内存要求仅限于一个记录的大小 状态处理的开销。

以下是工作示例如何使用Plain Scala使用迭代器来处理它:

case class EmployeeRecord(dept: String, name: String, location: String, dob: String, project: String, joined: String, left: String) {
  def toCSV = this.productIterator.mkString(", ")
}

class EmployeeParser() {
  var currentStack : Map[String, String] = Map()
  val (dept, name, location, birthdate, project, joined, left) = ("dept", "name", "location", "birthdate", "project", "joined", "left")
  val keySequence = Seq(dept, name, location, birthdate, project, joined, left)
  val ParseKeys = Map("Project name" -> project, "DOJ" -> joined, "DOL" -> left, "DOB" -> birthdate, "Dept" -> dept)
  val keySet = Set(keySequence)
  def clearDependencies(key: String) : Unit = {
    val keepKeys = keySequence.dropWhile(k => k != key).toSet
    currentStack = currentStack.filterKeys(k => !keepKeys.contains(k))
  }
  def isValidEntry(key: String) : Boolean = {
    val precedents = keySequence.takeWhile(k => k != key).drop(1)
    precedents.forall(k => currentStack.contains(k))
  }
  def add(key:String, value:String): Option[Unit] = {
    if (!isValidEntry(key)) None else {
      clearDependencies(key)
      currentStack = currentStack + (key -> value)
      Some(())
    }
  } 
  def record: Option[EmployeeRecord] = 
    for {
      _dept <- currentStack.get(dept)
      _name <- currentStack.get(name)
      _location <- currentStack.get(location)
      _dob <- currentStack.get(birthdate)
      _project <- currentStack.get(project)
      _joined <- currentStack.get(joined)
      _left <- currentStack.get(left)
    } yield EmployeeRecord(_dept, _name, _location, _dob, _project,_joined, _left)
  val EmpRegex = "^Emp name is (.*) lives in (.*)$".r
  def parse(line:String):Option[EmployeeRecord] = {
    if (line.startsWith("Emp")) { // have to deal with that inconsistency in a different way than using keys
      val maybeEmp = Option(line).map{case EmpRegex(n,l) => (n,l)}
                                 .foreach{case (n,l) => add(name, n) ; add(location, l)}
      None
    } else {
      val entry = line.split(":").map(_.trim)
      for { entryKey <- entry.lift(0)
            entryValue <- entry.lift(1)
            key <- ParseKeys.get(entryKey)
            _ <- add(key, entryValue)
            rec <- record
          } yield rec
    }
  }
}

要使用它,我们实例化解析器并将其应用于迭代器:

val iterator = Source.fromFile(...).getLines
val parser = new EmployeeParser()
val parsedRecords = iterator.map(parser.parse).collect{case Some(record) => record}
val parsedCSV = parsedRecords.map(rec => rec.toCSV)
parsedCSV.foreach(line => // write to destination file)

最新更新