我有一批二进制文件(每个大约3mb(,每次接收大约20000个文件。这些文件在下游用于进一步处理,但我想处理它们并存储在增量表中。
我可以很容易地做到这一点:
df = spark.read.format(“binaryFile”).load(<path-to-batch>)
df = df.withColumn(“id”, expr(“uuid()”)
dt = DeltaTable.forName(“myTable”)
dt.alias(“a”).merge(
df.alias(“a”),
“a.path = b.path”
).whenNotMatchedInsert(
values={“id”: “b.id”, “content”: “b.content”}
).execute()
这使得表已经很慢了,但以后我需要查询某些ID,收集它们并将它们单独写回二进制文件。
问题:
- 我的表会从批处理列和分区中受益吗
- 我应该按
id
分区吗?我知道这并不理想,但可能会使查询单个行变得更容易 - 有没有比
.collect()
更好的方法再次写出文件?我看到,当我选择大约1000个特定的id写出来时,大约10分钟只是收集,然后不到一分钟就可以写了。我会做一些类似的事情:
for row in df.collect():
with open(row.id, “wb”) as fw:
fw.write(row.content)
- 由于uuid((返回随机值,恐怕我们无法使用它来比较现有数据和新记录。(如果我误解了这个想法,很抱歉(
- 我认为使用按id分区不会有帮助,因为id列显然具有很高的基数
- 我认为,与其使用collect((将所有记录加载到Driver中,不如先将Spark数据帧中的记录从所有工作节点直接同时写入ADLS上的一个临时位置,然后从该位置聚合一些数据文件