我有一个MongoDB集合,看起来像这样:
{
"_id" : { "customerName" : "Bob", "customerPhone" : "123-456-7890"},
"purchases": ["A", "B", "C", "D"]
}
基本上,_id是关于客户的一对唯一密钥,而购买是客户购买的物品的数组。
我还有一个PySpark DataFrame,我想把它推到这个集合中,其中包含我想更新这个特定文档的信息。
df.write.format("com.mongodb.spark.sql.DefaultSource").mode("append")
.option("spark.mongodb.output.uri", "mongodb://localhost:27017/customer.purchases").save()
问题是,如果我要更新这个文档,在那里我想为Bob添加新的采购,它只会在purchases
中添加不存在的采购,而不是全部添加。
因此,我现在所做的是,我只需要调用rdd.collect()
将整个事情转换为列表,而不使用模式将其转换为DataFrame。然后逐个插入所有内容,同时检查密钥是否存在;这使得该部分速度较慢,并且当对RDD的查询变大时需要大量内存。
对于版本:
PySpark:2.2MongoDB:3.0.15Mongo Spark连接器:2.2.1
如果我可以使用dataframe将数组中的所有元素附加到MongoDB集合中,有人愿意吗?此外,如果我有什么遗漏或其他应该做的事情,请告诉我。谢谢
您需要更改文档的数据模型或模式。这里的重要部分是_id
密钥字段。字段名称_id
被保留用作主键;它的值在集合中必须是唯一的,是不可变的,并且可以是数组以外的任何类型。
在您的情况下,_id
字段的值是可变的,事实上这就是您试图更新的内容。作为一个建议,您可能想将其更改为:
{ "_id" : <unique identifier>
"customerName" : "Bob",
"customerPhone" : "123-456-7890",
"purchases": ["A", "B", "C", "D"]
}
您可以使用ObjectId的默认_id
值作为唯一标识符。
一旦您在_id
字段上有了唯一的标识符,我们就来谈谈更新操作。由于MongoDB Spark Connector v1.1+(当前版本2.2),如果数据帧在写入过程中包含_id
字段,则数据将被打乱。这意味着具有相同_id值的任何现有文档都将被更新,并且集合中没有现有_id的新文档将被插入。
奖励回合:
您还需要为
purchases
字段找到更好的模式。具有未定义长度的数组长度可能会在将来产生问题。即Bob一年内购买了1000件商品。请更新您的MongoDB服务器版本(3.0.x版本从2015年开始),目前的稳定版本是3.4,3.6将在下个月发布。