我的示例数据如下
{ Line 1
Line 2
Line 3
Line 4
...
...
...
Line 6
Complete info:
Dept : HR
Emp name is Andrew lives in Colorodo
DOB : 03/09/1958
Project name : Healthcare
DOJ : 06/04/2011
DOL : 09/21/2011
Project name : Retail
DOJ : 11/04/2011
DOL : 08/21/2013
Project name : Audit
DOJ : 09/11/2013
DOL : 09/01/2014
Project name : ContorlManagement
DOJ : 01/08/2015
DOL : 02/14/2016
Emp name is Alex lives in Texas
DOB : 03/09/1958
Project name : Healthcare
DOJ : 06/04/2011
DOL : 09/21/2011
Project name : ContorlManagement
DOJ : 01/08/2015
DOL : 02/14/2016
Emp name is Mathew lives in California
DOB : 03/09/1958
Project name : Healthcare
DOJ : 06/04/2011
DOL : 09/21/2011
Project name : Retail
DOJ : 11/04/2011
DOL : 08/21/2013
Project name : Audit
DOJ : 09/11/2013
DOL : 09/01/2014
Project name : ContorlManagement
DOJ : 01/08/2015
DOL : 02/14/2016
Dept : QC
Emp name is Nguyen lives in Nevada
DOB : 03/09/1958
Project name : Healthcare
DOJ : 06/04/2011
DOL : 09/21/2011
Project name : Retail
DOJ : 11/04/2011
DOL : 08/21/2013
Project name : Audit
DOJ : 09/11/2013
DOL : 09/01/2014
Project name : ContorlManagement
DOJ : 01/08/2015
DOL : 02/14/2016
Emp name is Cassey lives in Newyork
DOB : 03/09/1958
Project name : Healthcare
DOJ : 06/04/2011
DOL : 09/21/2011
Project name : ContorlManagement
DOJ : 01/08/2015
DOL : 02/14/2016
Emp name is Ronney lives in Alasca
DOB : 03/09/1958
Project name : Audit
DOJ : 09/11/2013
DOL : 09/01/2014
Project name : ContorlManagement
DOJ : 01/08/2015
DOL : 02/14/2016
line21
line22
line23
...
}
输出我需要;
{
Dept Empname State Dob Projectname DOJ DOE
HR Andrew Colorodo 03/09/1958 Healthcare 06/04/2011 09/21/2011
HR Andrew Colorodo 03/09/1958 Retail 11/04/2011 08/21/2013
HR Andrew Colorodo 03/09/1958 Audit 09/11/2013 09/01/2014
HR Andrew Colorodo 03/09/1958 ControlManagement 06/04/2011 09/21/2011
HR Alex Texas 03/09/1958 Healthcare 06/04/2011 09/21/2011
HR Alex Texas 03/09/1958 ControlManagement 06/04/2011 09/21/2011
HR Mathews California 03/09/1958 Healthcare 06/04/2011 09/21/2011
HR Mathews California 03/09/1958 Retail 11/04/2011 08/21/2013
HR Mathews California 03/09/1958 Audit 09/11/2013 09/01/2014
HR Mathews California 03/09/1958 ControlManagement 06/04/2011 09/21/2011
QC Nguyen Nevada 03/09/1958 Healthcare 06/04/2011 09/21/2011
QC Nguyen Nevada 03/09/1958 Retail 11/04/2011 08/21/2013
QC Nguyen Nevada 03/09/1958 Audit 09/11/2013 09/01/2014
QC Nguyen Nevada 03/09/1958 ControlManagement 06/04/2011 09/21/2011
QC Casey Newyork 03/09/1958 Healthcare 06/04/2011 09/21/2011
QC Casey Newyork 03/09/1958 Retail 11/04/2011 08/21/2013
QC Casey Newyork 03/09/1958 Audit 09/11/2013 09/01/2014
QC Casey Newyork 03/09/1958 ControlManagement 06/04/2011 09/21/2011}
我尝试了以下选项:1)然后想在地图内使用地图进行匹配。有很多错误。然后阅读此处的帖子,该帖子解释了我的地图不能有另一个地图。实际上,在另一个内部无法进行RDD转换。对不起。新手要火花。
2)尝试使用reg表达式。然后在捕获的组上拨打地图。但是,由于每个部门都有多个EMP,并且每个员工都有多个项目信息,因此我无法反复分组该部分数据,也无法与相应的员工映射。员工和部门的详细信息也是如此。
Q1:甚至可以将以上示例数据转换为Spark/Scala中的以上数据格式。?
Q2:如果是这样,我会追求的逻辑/概念?
预先感谢。
Q1:是否可以使用SPARK转换此类嵌套数据格式?
A:是的。如果记录更详细的记录,我建议使用此问题中讨论的多行方法:如何处理Spark
中的多行输入记录但是,鉴于在数据"部门"中包含大量数据中,我不建议它。
Q2:我应该追随什么逻辑/概念?
a2:使用基于迭代器或基于流的实现,可以更好地接近这种线性处理,在我们穿越线路时正在构建状态:
可以更好地接近:我们每行消耗行,只有在完成这些记录时才能产生记录。上下文保存在某些状态。使用这种方法,文件确实无关紧要,因为内存要求仅限于一个记录的大小 状态处理的开销。
以下是工作示例如何使用Plain Scala使用迭代器来处理它:
case class EmployeeRecord(dept: String, name: String, location: String, dob: String, project: String, joined: String, left: String) {
def toCSV = this.productIterator.mkString(", ")
}
class EmployeeParser() {
var currentStack : Map[String, String] = Map()
val (dept, name, location, birthdate, project, joined, left) = ("dept", "name", "location", "birthdate", "project", "joined", "left")
val keySequence = Seq(dept, name, location, birthdate, project, joined, left)
val ParseKeys = Map("Project name" -> project, "DOJ" -> joined, "DOL" -> left, "DOB" -> birthdate, "Dept" -> dept)
val keySet = Set(keySequence)
def clearDependencies(key: String) : Unit = {
val keepKeys = keySequence.dropWhile(k => k != key).toSet
currentStack = currentStack.filterKeys(k => !keepKeys.contains(k))
}
def isValidEntry(key: String) : Boolean = {
val precedents = keySequence.takeWhile(k => k != key).drop(1)
precedents.forall(k => currentStack.contains(k))
}
def add(key:String, value:String): Option[Unit] = {
if (!isValidEntry(key)) None else {
clearDependencies(key)
currentStack = currentStack + (key -> value)
Some(())
}
}
def record: Option[EmployeeRecord] =
for {
_dept <- currentStack.get(dept)
_name <- currentStack.get(name)
_location <- currentStack.get(location)
_dob <- currentStack.get(birthdate)
_project <- currentStack.get(project)
_joined <- currentStack.get(joined)
_left <- currentStack.get(left)
} yield EmployeeRecord(_dept, _name, _location, _dob, _project,_joined, _left)
val EmpRegex = "^Emp name is (.*) lives in (.*)$".r
def parse(line:String):Option[EmployeeRecord] = {
if (line.startsWith("Emp")) { // have to deal with that inconsistency in a different way than using keys
val maybeEmp = Option(line).map{case EmpRegex(n,l) => (n,l)}
.foreach{case (n,l) => add(name, n) ; add(location, l)}
None
} else {
val entry = line.split(":").map(_.trim)
for { entryKey <- entry.lift(0)
entryValue <- entry.lift(1)
key <- ParseKeys.get(entryKey)
_ <- add(key, entryValue)
rec <- record
} yield rec
}
}
}
要使用它,我们实例化解析器并将其应用于迭代器:
val iterator = Source.fromFile(...).getLines
val parser = new EmployeeParser()
val parsedRecords = iterator.map(parser.parse).collect{case Some(record) => record}
val parsedCSV = parsedRecords.map(rec => rec.toCSV)
parsedCSV.foreach(line => // write to destination file)