假设一个目录中有多个文件,每个文件都是
文件1
20100101|12.34|...
20100101|12.34|...
20100101|36.00|...
20100102|36.00|...
20100101|14.00|...
20100101|14.00|...
文件2
20100101|12.34|...
20100101|12.34|...
20100101|36.00|...
20100102|36.00|...
20100101|14.00|...
20100101|14.00|...
具有相同日期和值的相邻行对应于同一事件。 两个单独文件中的两行不能相邻。
预期成果:
e1|20100101|12.34|...
e1|20100101|12.34|...
e2|20100101|36.00|...
e3|20100102|36.00|...
e4|20100101|14.00|...
e4|20100101|14.00|...
e5|20100101|12.34|...
e5|20100101|12.34|...
e6|20100101|36.00|...
e7|20100102|36.00|...
e8|20100101|14.00|...
e8|20100101|14.00|...
其中 eN 在这里是一个任意值(e1 <> e2 <> e3 ...)来澄清样本。
以下代码是否为所有文件的所有行提供唯一的事件 ID:
case class Event(
LineNumber: Long, var EventId: Long,
Date: String, Value: String //,..
)
val lines = sc.textFile("theDirectory")
val rows = lines.filter(l => !l.startsWith("someString")).zipWithUniqueId
.map(l => l._2.toString +: l._1.split("""|""", -1));
var lastValue: Float = 0;
var lastDate: String = "00010101";
var eventId: Long = 0;
var rowDF = rows
.map(c => {
var e = Event(
c(0).toLong, 0, c(1), c(2) //,...
);
if ( e.Date != lastDate || e.Value != lastValue) {
lastDate = e.Date
lastValue = e.Value
eventId = e.LineNumber
}
e.EventId = eventId
e
}).toDF();
基本上,我使用zipWithUniqueId
给出的唯一行号作为相邻行序列的键。
我认为我的潜在问题是:第二个映射操作是否有概率将文件的内容拆分到多个进程中?
这是一个惯用的解决方案。希望这有帮助。 我使用文件名来区分文件。组通过涉及文件名、zipindex 然后联接回原始输入数据帧,产生了所需的输出。
import org.apache.spark.sql.functions._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
scala> val lines = spark.read.textFile("file:///home/fsdjob/theDir").withColumn("filename", input_file_name())
scala> lines.show(false)
+--------------+------------------------------------+
|value |filename |
+--------------+------------------------------------+
|20100101|12.34|file:///home/fsdjob/theDir/file1.txt|
|20100101|12.34|file:///home/fsdjob/theDir/file1.txt|
|20100101|36.00|file:///home/fsdjob/theDir/file1.txt|
|20100102|36.00|file:///home/fsdjob/theDir/file1.txt|
|20100101|14.00|file:///home/fsdjob/theDir/file1.txt|
|20100101|14.00|file:///home/fsdjob/theDir/file1.txt|
|20100101|12.34|file:///home/fsdjob/theDir/file2.txt|
|20100101|12.34|file:///home/fsdjob/theDir/file2.txt|
|20100101|36.00|file:///home/fsdjob/theDir/file2.txt|
|20100102|36.00|file:///home/fsdjob/theDir/file2.txt|
|20100101|14.00|file:///home/fsdjob/theDir/file2.txt|
|20100101|14.00|file:///home/fsdjob/theDir/file2.txt|
+--------------+------------------------------------+
scala> val linesGrpWithUid = lines.groupBy("value", "filename").count.drop("count").rdd.zipWithUniqueId
linesGrpWithUid: org.apache.spark.rdd.RDD[(org.apache.spark.sql.Row, Long)] = MapPartitionsRDD[135] at zipWithUniqueId at <console>:31
scala> val linesGrpWithIdRdd = linesGrpWithUid.map( x => { org.apache.spark.sql.Row(x._1.get(0),x._1.get(1), x._2) })
linesGrpWithIdRdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[136] at map at <console>:31
scala> val schema =
| StructType(
| StructField("value", StringType, false) ::
| StructField("filename", StringType, false) ::
| StructField("id", LongType, false) ::
| Nil)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(value,StringType,false), StructField(filename,StringType,false), StructField(id,LongType,false))
scala> val linesGrpWithIdDF = spark.createDataFrame(linesGrpWithIdRdd, schema)
linesGrpWithIdDF: org.apache.spark.sql.DataFrame = [value: string, filename: string ... 1 more field]
scala> linesGrpWithIdDF.show(false)
+--------------+------------------------------------+---+
|value |filename |id |
+--------------+------------------------------------+---+
|20100101|12.34|file:///home/fsdjob/theDir/file2.txt|3 |
|20100101|36.00|file:///home/fsdjob/theDir/file2.txt|6 |
|20100102|36.00|file:///home/fsdjob/theDir/file2.txt|20 |
|20100102|36.00|file:///home/fsdjob/theDir/file1.txt|30 |
|20100101|14.00|file:///home/fsdjob/theDir/file1.txt|36 |
|20100101|14.00|file:///home/fsdjob/theDir/file2.txt|56 |
|20100101|36.00|file:///home/fsdjob/theDir/file1.txt|146|
|20100101|12.34|file:///home/fsdjob/theDir/file1.txt|165|
+--------------+------------------------------------+---+
scala> val output = lines.join(linesGrpWithIdDF, Seq("value", "filename"))
output: org.apache.spark.sql.DataFrame = [value: string, filename: string ... 1 more field]
scala> output.show(false)
+--------------+------------------------------------+---+
|value |filename |id |
+--------------+------------------------------------+---+
|20100101|12.34|file:///home/fsdjob/theDir/file2.txt|3 |
|20100101|12.34|file:///home/fsdjob/theDir/file2.txt|3 |
|20100101|36.00|file:///home/fsdjob/theDir/file2.txt|6 |
|20100102|36.00|file:///home/fsdjob/theDir/file2.txt|20 |
|20100102|36.00|file:///home/fsdjob/theDir/file1.txt|30 |
|20100101|14.00|file:///home/fsdjob/theDir/file1.txt|36 |
|20100101|14.00|file:///home/fsdjob/theDir/file1.txt|36 |
|20100101|14.00|file:///home/fsdjob/theDir/file2.txt|56 |
|20100101|14.00|file:///home/fsdjob/theDir/file2.txt|56 |
|20100101|36.00|file:///home/fsdjob/theDir/file1.txt|146|
|20100101|12.34|file:///home/fsdjob/theDir/file1.txt|165|
|20100101|12.34|file:///home/fsdjob/theDir/file1.txt|165|
+--------------+------------------------------------+---+