通过维护订单来汇总重复记录,还包括重复记录



我正在尝试解决一个有趣的问题,很容易做一个集体组来进行汇总,例如总和,计数等。但是这个问题略有不同。让我解释一下:

这是我的元组清单:

val repeatSmokers: List[(String, String, String, String, String, String)] =
  List(
    ("ID76182", "sachin", "kita MR.", "56308", "1990", "300"),
    ("ID76182", "KOUN", "Jana MR.", "56714", "1990", "100"),
    ("ID76182", "GANGS", "SKILL", "27539", "1990", "255"),
    ("ID76182", "GANGS", "SKILL", "27539", "1990", "110"),
    ("ID76182", "SEMI", "GAUTAM A MR.", "45873", "1990", "20"),
    ("ID76182", "SEMI", "GAUTAM A MR.", "45873", "1990", "6750"),
    ("ID76182", "DOWNES", "RYAN", "47542", "1990", "2090"),
    ("ID76182", "DRAGON", "WARS", "49337", "1990", "200"),
    ("ID76182", "HULK", "PAIN MR.", "47542", "1990", "280"),
    ("ID76182", "JAMES", "JIM", "30548", "1990", "300"),
    ("ID76182", "KIMMELSHUE", "RUTH", "55345", "1990", "2600"),
    ("ID76182", "DRAGON", "WARS", "49337", "1990", "370"),
    ("ID76182", "COOPER", "ANADA", "45873", "1990", "2600"),
    ("ID76182", "SEMI", "GAUTAM A MR.", "45873", "1990", "2600"),
    ("ID76182", "HULK", "PAIN MR.", "47542", "1990", "256")
  )

这些记录的架构是(Idnumber, name, test_code, year, amount)。从这些元素中,我只需要重复记录,我们在上面列表中定义唯一组合的方式是使用(sachin, kita MR.,56308)名称和test_code组合。这意味着如果相同的名称和test_code重复,则是重复的吸烟者记录。为简单起见,您只能假设test_code是唯一值,如果重复您可以说这是一个重复的吸烟者记录。

以下是确切的输出:

ID76182,27539,1990,255,1 
ID76182,27539,1990,365,2
ID76182,45873,1990,20,1 
ID76182,45873,1990,6770,2 
ID76182,45873,1990,9370,3
ID76182,49337,1990,200,1
ID76182,49337,1990,570,2
ID76182,47542,1990,280,1
ID76182,47542,1990,536,2

最后,这里具有挑战性的部分是在每秒重复吸烟者记录中保持顺序和汇总总和。

例如:此记录模式为:ID76182,47542,1990,536,2

idnumber,test_code,年份,金额,出现

由于发生了两次,我们在上面看到2个。

注意:

输出可以是任何集合的列表,但它应该以我上面提到的相同格式

,所以这是Scala中的一些代码

import java.util.ArrayList
import java.util.LinkedHashMap
import scala.collection.convert._

type RawRecord = (String, String, String, String, String, String)
type Record = (String, String, String, String, Int, Int)
type RecordKey = (String, String, String, String)
type Output = (String, String, String, String, Int, Int, Int)
val keyF: Record => RecordKey = r => (r._1, r._2, r._3, r._4)
val repeatSmokersRaw: List[RawRecord] =
  List(
    ("ID76182", "sachin", "kita MR.", "56308", "1990", "300"),
    ("ID76182", "KOUN", "Jana MR.", "56714", "1990", "100"),
    ("ID76182", "GANGS", "SKILL", "27539", "1990", "255"),
    ("ID76182", "GANGS", "SKILL", "27539", "1990", "110"),
    ("ID76182", "SEMI", "GAUTAM A MR.", "45873", "1990", "20"),
    ("ID76182", "SEMI", "GAUTAM A MR.", "45873", "1990", "6750"),
    ("ID76182", "DOWNES", "RYAN", "47542", "1990", "2090"),
    ("ID76182", "DRAGON", "WARS", "49337", "1990", "200"),
    ("ID76182", "HULK", "PAIN MR.", "47542", "1990", "280"),
    ("ID76182", "JAMES", "JIM", "30548", "1990", "300"),
    ("ID76182", "KIMMELSHUE", "RUTH", "55345", "1990", "2600"),
    ("ID76182", "DRAGON", "WARS", "49337", "1990", "370"),
    ("ID76182", "COOPER", "ANADA", "45873", "1990", "2600"),
    ("ID76182", "SEMI", "GAUTAM A MR.", "45873", "1990", "2600"),
    ("ID76182", "HULK", "PAIN MR.", "47542", "1990", "256")
  )
val repeatSmokers = repeatSmokersRaw.map(r => (r._1, r._2, r._3, r._4, r._5.toInt, r._6.toInt))
val acc = new LinkedHashMap[RecordKey, (util.ArrayList[Output], Int, Int)]
repeatSmokers.foreach(r => {
  val key = keyF(r)
  var cur = acc.get(key)
  if (cur == null) {
    cur = (new ArrayList[Output](), 0, 0)
  }
  val nextCnt = cur._2 + 1
  val sum = cur._3 + r._6
  val output = (r._1, r._2, r._3, r._4, r._5, sum, nextCnt)
  cur._1.add(output)
  acc.put(key, (cur._1, nextCnt, sum))
})
val result = acc.values().asScala.filter(p => p._2 > 1).flatMap(p => p._1.asScala)
// or if you are clever you can merge filter and flatMap as
// val result = acc.values().asScala.flatMap(p => if (p._1.size > 1) p._1.asScala else Nil)
println(result.mkString("n"))

它打印

(ID76182,帮派,技能,27539,1990,255,1(
(ID76182,帮派,技能,27539,1990,365,2(
(ID76182,Semi,Gautam A MR。,45873,1990,20,1(
(ID76182,Semi,Gautam A MR。,45873,1990,6770,2(
(ID76182,Semi,Gautam A MR。,45873,1990,9370,3(
(ID76182,龙,战争,49337,1990,200,1(
(ID76182,龙,战争,49337,1990,570,2(
(ID76182,绿巨人,Pain MR。,47542,1990,280,1(
(ID76182,绿巨人,痛苦MR。,47542,1990,536,2(

此代码中的主要技巧是将Java的LinkedHashMap用作累加器集合,因为它保留了插入顺序。其他技巧是将一些列表存储在内部(因为我使用Java-Collections,我决定将ArrayList用于内部累加器,但您可以使用任何喜欢的东西(。因此,想法是构建钥匙=>吸烟者列表的地图,并为每个钥匙店当前计数器和当前总和,以便将"聚合"吸烟者添加到列表中。构建地图时,请浏览它以滤除那些尚未累积至少2个记录的键,然后将列表的地图转换为单个列表(这是使用LinkedHashMap的重要点,因为插入顺序很重要在迭代中保存(

这是解决此问题的功能方法:

对于此输入:

val repeatSmokers: List[(String, String, String, String, String, String)] =
  List(
    ("ID76182", "sachin", "kita MR.", "56308", "1990", "300"),
    ("ID76182", "KOUN", "Jana MR.", "56714", "1990", "100"),
    ("ID76182", "GANGS", "SKILL", "27539", "1990", "255"),
    ("ID76182", "GANGS", "SKILL", "27539", "1990", "110"),
    ("ID76182", "SEMI", "GAUTAM A MR.", "45873", "1990", "20"),
    ("ID76182", "SEMI", "GAUTAM A MR.", "45873", "1990", "6750"),
    ("ID76182", "DOWNES", "RYAN", "47542", "1990", "2090"),
    ("ID76182", "DRAGON", "WARS", "49337", "1990", "200"),
    ("ID76182", "HULK", "PAIN MR.", "47542", "1990", "280"),
    ("ID76182", "JAMES", "JIM", "30548", "1990", "300"),
    ("ID76182", "KIMMELSHUE", "RUTH", "55345", "1990", "2600"),
    ("ID76182", "DRAGON", "WARS", "49337", "1990", "370"),
    ("ID76182", "COOPER", "ANADA", "45873", "1990", "2600"),
    ("ID76182", "SEMI", "GAUTAM A MR.", "45873", "1990", "2600"),
    ("ID76182", "HULK", "PAIN MR.", "47542", "1990", "256")
  )

用代表记录的案例类:

case class Record(
    id: String,
    fname: String,
    lname: String,
    code: String,
    year: String,
    amount: String)

我们可以运行以下内容:

val result = repeatSmokers
  .map(recordTuple => Record.tupled(recordTuple))
  .zipWithIndex
  .groupBy { case (record, order) => (record.fname, record.lname, record.code) }
  .flatMap {
    case (_, List(singleRecord)) => Nil // get rid of non-repeat records
    case (key, records) => {
      val firstKeyIdx = records.head._2
      val amounts = records.map {
        case (record, order) => record.amount.toInt
      }.foldLeft(List[Int]()) {
        case (Nil, addAmount) => List(addAmount)
        case (previousAmounts :+ lastAmount, addAmount) =>
          previousAmounts :+ lastAmount :+ (lastAmount + addAmount)
      }
      records
        .zip(amounts)
        .zipWithIndex
        .map {
          case (((rec, order), amount), idx) =>
            val serializedRecord =
              List(rec.id, rec.code, rec.year, amount, idx + 1)
            (serializedRecord.mkString(","), (firstKeyIdx, idx))
        }
    }
  }
  .toList
  .sortBy { case (serializedRecord, finalOrder) => finalOrder }
  .map { case (serializedRecord, finalOrder) => serializedRecord }

这会产生:

ID76182,27539,1990,255,1
ID76182,27539,1990,365,2
ID76182,45873,1990,20,1
ID76182,45873,1990,6770,2
ID76182,45873,1990,9370,3
ID76182,49337,1990,200,1
ID76182,49337,1990,570,2
ID76182,47542,1990,280,1
ID76182,47542,1990,536,2

一些解释:

从元组实例化案例类的一种非常好的方法(从元组列表中创建记录列表(:

.map(recordTuple => Record.tupled(recordTuple))

每个记录都用其全局索引(记录,索引(进行构成,以便能够有办法与订购一起工作:

.zipWithIndex

然后,我们使用您需要的密钥进行分组:

.groupBy { case (record, order) => (record.fname, record.lname, record.code) }

然后,对于由小组阶段产生的每个密钥/值,我们将输出记录列表(或值为单个记录,一个空列表(。因此,将要产生的列表变平的平面图。

这是摆脱单一记录的部分:

case (_, List(singleRecord)) => Nil

其他案例涉及累积金额的创建(这是INT列表((请注意Spark Developers:GroupBy确实保留了给定密钥内的价值元素的顺序(:

val amounts = records.map {
    case (record, order) => record.amount.toInt
  }.foldLeft(List[Int]()) {
    case (Nil, addAmount) => List(addAmount)
    case (previousAmounts :+ lastAmount, addAmount) =>
      previousAmounts :+ lastAmount :+ (lastAmount + addAmount)
  }

这些金额被拉回记录,以便用给定的累计金额修改每个记录金额。而且还在那里将记录与最终所需格式序列化:

records
    .zip(amounts)
    .zipWithIndex
    .map {
      case (((rec, order), amount), idx) =>
        val serializedRecord =
          List(rec.id, rec.code, rec.year, amount, idx + 1).mkString(
            ",")
        (serializedRecord, (firstKeyIdx, idx))
    }

上一部分还用其索引缩小了记录。实际上,每个序列化记录都配有元组(firstKeyIdx,idx(,然后根据需要订购每个记录(按键的第一个图案(firstKeyIdx(,然后用于来自同一键的记录,"嵌套"订单由IDX定义(:

.sortBy { case (serializedRecord, finalOrder) => finalOrder }

这是一种基于 @serggr的解决方案的功能/递归方法,它正确地介绍了linkedhashmap。

给定此输入:

val repeatSmokers: List[(String, String, String, String, String, Int)] =
  List(
    ("ID76182", "sachin", "kita MR.", "56308", "1990", 300),
    ("ID76182", "KOUN", "Jana MR.", "56714", "1990", 100),
    ("ID76182", "GANGS", "SKILL", "27539", "1990", 255),
    ("ID76182", "GANGS", "SKILL", "27539", "1990", 110),
    ("ID76182", "SEMI", "GAUTAM A MR.", "45873", "1990", 20),
    ("ID76182", "SEMI", "GAUTAM A MR.", "45873", "1990", 6750),
    ("ID76182", "DOWNES", "RYAN", "47542", "1990", 2090),
    ("ID76182", "DRAGON", "WARS", "49337", "1990", 200),
    ("ID76182", "HULK", "PAIN MR.", "47542", "1990", 280),
    ("ID76182", "JAMES", "JIM", "30548", "1990", 300),
    ("ID76182", "KIMMELSHUE", "RUTH", "55345", "1990", 2600),
    ("ID76182", "DRAGON", "WARS", "49337", "1990", 370),
    ("ID76182", "COOPER", "ANADA", "45873", "1990", 2600),
    ("ID76182", "SEMI", "GAUTAM A MR.", "45873", "1990", 2600),
    ("ID76182", "HULK", "PAIN MR.", "47542", "1990", 256)
  )

首先以这种方式准备和汇总数据:

case class Record(
  id: String, fname: String, lname: String,
  code: String, year: String, var amount: Int
)
case class Key(fname: String, lname: String, code: String)
val preparedRecords: List[(Key, Record)] = repeatSmokers.map {
  case recordTuple @ (_, fname, lname, code, _, _) =>
    (Key(fname, lname, code), Record.tupled(recordTuple))
}

import scala.collection.mutable.LinkedHashMap
def aggregateDuplicatesWithOrder(
    remainingRecords: List[(Key, Record)],
    processedRecords: LinkedHashMap[Key, List[Record]]
): LinkedHashMap[Key, List[Record]] =
  remainingRecords match {
    case (key, record) :: newRemainingRecords => {
      processedRecords.get(key) match {
        case Some(recordList :+ lastRecord) => {
          record.amount = record.amount + lastRecord.amount
          processedRecords.update(key, recordList :+ lastRecord :+ record)
        }
        case None => processedRecords(key) = List(record)
      }
      aggregateDuplicatesWithOrder(newRemainingRecords, processedRecords)
    }
    case Nil => processedRecords
  }
val result = aggregateDuplicatesWithOrder(
  preparedRecords, LinkedHashMap[Key, List[Record]]()
).values.flatMap {
  case _ :: Nil => Nil
  case records =>
    records.zipWithIndex.map { case (rec, idx) =>
      List(rec.id, rec.code, rec.year, rec.amount, idx + 1).mkString(",")
    }
}

最新更新