PySpark MongoDB从DataFrame中附加数组的所有元素



我有一个MongoDB集合,看起来像这样:

{
"_id" : { "customerName" : "Bob",  "customerPhone" : "123-456-7890"},
"purchases": ["A", "B", "C", "D"]
}

基本上,_id是关于客户的一对唯一密钥,而购买是客户购买的物品的数组。

我还有一个PySpark DataFrame,我想把它推到这个集合中,其中包含我想更新这个特定文档的信息。

df.write.format("com.mongodb.spark.sql.DefaultSource").mode("append") 
.option("spark.mongodb.output.uri", "mongodb://localhost:27017/customer.purchases").save()

问题是,如果我要更新这个文档,在那里我想为Bob添加新的采购,它只会在purchases中添加不存在的采购,而不是全部添加。

因此,我现在所做的是,我只需要调用rdd.collect()将整个事情转换为列表,而不使用模式将其转换为DataFrame。然后逐个插入所有内容,同时检查密钥是否存在;这使得该部分速度较慢,并且当对RDD的查询变大时需要大量内存。

对于版本:

PySpark:2.2MongoDB:3.0.15Mongo Spark连接器:2.2.1

如果我可以使用dataframe将数组中的所有元素附加到MongoDB集合中,有人愿意吗?此外,如果我有什么遗漏或其他应该做的事情,请告诉我。谢谢

您需要更改文档的数据模型或模式。这里的重要部分是_id密钥字段。字段名称_id被保留用作主键;它的值在集合中必须是唯一的,是不可变的,并且可以是数组以外的任何类型。

在您的情况下,_id字段的值是可变的,事实上这就是您试图更新的内容。作为一个建议,您可能想将其更改为:

{ "_id" : <unique identifier>
"customerName" : "Bob",  
"customerPhone" : "123-456-7890",
"purchases": ["A", "B", "C", "D"]
}

您可以使用ObjectId的默认_id值作为唯一标识符。

一旦您在_id字段上有了唯一的标识符,我们就来谈谈更新操作。由于MongoDB Spark Connector v1.1+(当前版本2.2),如果数据帧在写入过程中包含_id字段,则数据将被打乱。这意味着具有相同_id值的任何现有文档都将被更新,并且集合中没有现有_id的新文档将被插入。

奖励回合:

  • 您还需要为purchases字段找到更好的模式。具有未定义长度的数组长度可能会在将来产生问题。即Bob一年内购买了1000件商品。

  • 请更新您的MongoDB服务器版本(3.0.x版本从2015年开始),目前的稳定版本是3.4,3.6将在下个月发布。

相关内容

  • 没有找到相关文章

最新更新