无法在AWS EMR中使用Pyspark或Python从mongoDB读取数据



我正在尝试使用AWS EMR中的PySpark和本机python从3节点MongoDB集群(副本集(读取数据。我在AWS EMR集群中执行代码时遇到了问题,如下所述,但相同的代码在我的本地windows机器中运行良好。

spark版本-2.4.8Scala版本-2.11.12MongoDB版本-4.4.8mongo火花连接器版本-mongo-spark-connector_2.11:2.4.4python版本-3.7.10通过Pyspark-(问题-Pyspark给出空数据帧(

以下是在本地和集群模式下运行pyspark作业时的命令。

本地模式:spark-submit--master local[*]--packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.4 test.py

群集模式:spark-submit--master yarn--deploymode cluster--packages.mongodb.spark:mongo-spark-connector_2.11:2.4.4 test.py

对于这两种模式,我无法从mongoDB(空数据帧(读取数据,即使telnet在spark集群的所有节点(从所有节点(上工作。从日志中,我可以确认spark能够与mongoDB通信,并且我的pyspark作业正在提供空的数据帧。请在下面找到相同的截图!

mongoDB连接到pyspark

pyspark给出空数据帧

以下是相同的代码片段:

from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkConf, SparkContext
import sys
import json
sc = SparkContext()
spark = SparkSession(sc).builder.appName("MongoDbToS3").config("spark.mongodb.input.uri", "mongodb://usename:password@host1:port1,host2:port2,host3:port3/db.collection/?replicaSet=ABCD&authSource=admin").getOrCreate()
data = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
data.show()

请让我知道我在pyspark代码中做错了什么或遗漏了什么?

通过原生python代码-(问题-如果batch_size>1,代码会被卡住,如果batch_size=1,它将打印前24个mongo文档,然后光标挂起(

我使用pymongo驱动程序通过原生python代码连接到mongoDB。问题是,当我尝试获取/打印batch_size为1000的mongoDB文档时,代码将永远挂起,然后会出现网络超时错误。但若我使batch_size=1,那个么游标就能够在再次挂起游标之后获取前24个文档。我们观察到,与前24个文档相比,第25个文档非常大(约4kb(,然后我们尝试跳过第25个文件,然后光标开始获取下一个文档,但它再次卡在其他位置,所以我们观察到每当文档大小较大时,光标都会被卡住。

你们能帮我理解一下这个问题吗?

是否有来自网络端或mongoDB端的阻塞?下面是代码片段:

from datetime import datetime
import json
#import boto3
from bson import json_util
import pymongo

client = pymongo.MongoClient("mongodb://username@host:port/?authSource=admin&socketTimeoutMS=3600000&maxIdleTimeMS=3600000")
# Database Name
db = client["database_name"]
# Collection Name
quoteinfo__collection= db["collection_name"]
results = quoteinfo__collection.find({}).batch_size(1000)
doc_count = quoteinfo__collection.count_documents({})
print("documents count from collection: ",doc_count)
print(results)
record_increment_no = 1
for record in results:
print(record)
print(record_increment_no)
record_increment_no = record_increment_no + 1
results.close()

下面是相同的输出屏幕截图对于batch_size=1000(代码挂起并给出网络超时错误(

pymongo代码被卡住

网络超时错误

batch_size=1(只打印文档至24日,然后光标挂起(

打印24个文档并挂起

我们的开发人员和MongoDB托管的AWS帐户之间的AWS帐户对等存在一些问题,如下所述

  1. 其中一条路由的流量通过VPC对等网络,而不是Transit Gateway
  2. MongoDB IP不属于路由表的CIDR范围

为MongoDb IP1和MongoDb IP2添加传输网关后,我们可以正确读取任何集合的任何批量大小的数据。

最新更新