我有 2 个输入文件:
a) 原始文件 ( orig_file.json) ),包含如下记录:
{"id": 1, "app": test_a, "description": test_app_a }
{"id": 2, "app": test_b, "description": test_app_b }
{"id": 3, "app": test_c, "description": test_app_c }
{"id": 4, "app": test_d, "description": test_app_d }
{"id": 5, "app": test_e, "description": test_app_e }
b) 一个"增量"文件 ( deltas_file.json) ),包含如下记录:
{"id": 1, "app": test_aaaxxx, "description": test_app_aaaxxx }
{"id": 6, "app": test_ffffff, "description": test_app_ffffff }
我正在尝试以这样的方式合并两个文件(原始+增量),以便产生这样的输出
{"id": 1, "app": test_aaaxxx, "description": test_app_aaaxxx }
{"id": 2, "app": test_b, "description": test_app_b }
{"id": 3, "app": test_c, "description": test_app_c }
{"id": 4, "app": test_d, "description": test_app_d }
{"id": 5, "app": test_e, "description": test_app_e }
{"id": 6, "app": test_ffffff, "description": test_app_ffffff }
*基本上通过添加任何新应用程序将原始文件与增量文件合并,并仅更新已存在的记录。.
我尝试使用不同的连接,但无法获得解决方案。
有人可以指导我解决此问题的方法吗?谢谢
左外连接和合并:
from pyspark.sql.functions import *
deltas.join(origin, ["id"], "leftouter")
.select("id",
coalesce(deltas["app"], origin["app"]).alias("app"),
coalesce(deltas["description"], origin["description"]).alias("description"))
尝试python
panda merge。
import panda as pd
# create your data frames here
pd.merge(delta_frame,orig_frame) # Try various required arguments in function
希望这有帮助!