Spark二进制文件和增量表



我有一批二进制文件(每个大约3mb(,每次接收大约20000个文件。这些文件在下游用于进一步处理,但我想处理它们并存储在增量表中。

我可以很容易地做到这一点:

df = spark.read.format(“binaryFile”).load(<path-to-batch>)
df = df.withColumn(“id”, expr(“uuid()”)
dt = DeltaTable.forName(“myTable”)
dt.alias(“a”).merge(
df.alias(“a”),
“a.path = b.path”
).whenNotMatchedInsert(
values={“id”: “b.id”, “content”: “b.content”}
).execute()

这使得表已经很慢了,但以后我需要查询某些ID,收集它们并将它们单独写回二进制文件。

问题:

  1. 我的表会从批处理列和分区中受益吗
  2. 我应该按id分区吗?我知道这并不理想,但可能会使查询单个行变得更容易
  3. 有没有比.collect()更好的方法再次写出文件?我看到,当我选择大约1000个特定的id写出来时,大约10分钟只是收集,然后不到一分钟就可以写了。我会做一些类似的事情:
for row in df.collect():
with open(row.id, “wb”) as fw:
fw.write(row.content)
  1. 由于uuid((返回随机值,恐怕我们无法使用它来比较现有数据和新记录。(如果我误解了这个想法,很抱歉(
  2. 我认为使用按id分区不会有帮助,因为id列显然具有很高的基数
  3. 我认为,与其使用collect((将所有记录加载到Driver中,不如先将Spark数据帧中的记录从所有工作节点直接同时写入ADLS上的一个临时位置,然后从该位置聚合一些数据文件

最新更新