使用spark和scala进行联接计数时获得性能的最佳方法



我有一个验证摄取操作的要求,基本上,我在HDFS中有两个大文件,一个是avro格式的(摄取文件),另一个是parquet格式的(合并文件)。

Avro文件具有以下架构:

文件名、日期、计数、字段1、字段2、字段3、字段4、字段5、字段6,。。。afieldN

Parquet文件具有以下架构:

fileName,anotherField1,anotherField1,anoherField2,anotherFiel3,anotherField 14,。。。,另一个FieldN

如果我尝试在DataFrame中加载这两个文件,然后尝试使用一个天真的联接,那么在我的本地机器中的作业需要超过24小时!,这是不可接受的。

ingestedDF.join(consolidatedDF).where($"filename" === $"fileName").count()

?实现这一目标的最佳方式是什么??在进行连接计数之前从DataFrame中删除列??计算每个数据帧的计数,然后求和?

PD

我读到了关于地图侧联合技术的文章,但如果有一个小文件可以放在RAM中,这个技术看起来会对我有用,但我不能保证,所以,我想知道社区更喜欢哪种方式来实现这一点。

http://dmtolpeko.com/2015/02/20/map-side-join-in-spark/

我会通过将数据剥离到我感兴趣的字段(filename)来解决这个问题,用它来自的源(原始数据集)创建一组唯一的文件名。在这一点上,两个中间数据集都有相同的模式,所以我们可以将它们合并并计数。这应该比在完整数据上使用join快几个数量级。

// prepare some random dataset
val data1 = (1 to 100000).filter(_ => scala.util.Random.nextDouble<0.8).map(i => (s"file$i", i, "rubbish"))
val data2 = (1 to 100000).filter(_ => scala.util.Random.nextDouble<0.7).map(i => (s"file$i", i, "crap"))
val df1 = sparkSession.createDataFrame(data1).toDF("filename", "index", "data")
val df2 = sparkSession.createDataFrame(data2).toDF("filename", "index", "data")
// select only the column we are interested in and tag it with the source.
// Lets make it distinct as we are only interested in the unique file count
val df1Filenames = df1.select("filename").withColumn("df", lit("df1")).distinct
val df2Filenames = df2.select("filename").withColumn("df", lit("df2")).distinct
// union both dataframes
val union = df1Filenames.union(df2Filenames).toDF("filename","source")
// let's count the occurrences of filename, by using a groupby operation
val occurrenceCount = union.groupBy("filename").count
// we're interested in the count of those files that appear in both datasets (with a count of 2)
occurrenceCount.filter($"count"===2).count

相关内容

  • 没有找到相关文章

最新更新