在 Python 中从 MongoDB GridFS 加载 Spark 2.x DataFrame



我在elephas下使用带有keras的pyspark sql。

我想尝试使用mongoDB GridFS进行某种分布式图像处理

我发现了相关的问题,但在 Java 世界中,在 Scala 上从 MongoDB 加载 Spark 2.x 数据帧 GridFS

但仅此而已,我找不到任何其他文档如何使用pySpark的GridFS。

我的 pyspark - mongo 代码看起来像这样:

sparkConf = SparkConf().setMaster("local[4]").setAppName("MongoSparkConnectorTour")
.set("spark.app.id", "MongoSparkConnectorTour")
.set("spark.mongodb.input.database", config.MONGO_DB)
# If executed via pyspark, sc is already instantiated
sc = SparkContext(conf=sparkConf)
sqlContext = SQLContext(sc)
dk = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", config.MONGO_MED_EVENTS)
.load()
if (dk.count() > 0):
# print data frame schema
dk.printSchema()
# Preview Dataframe (Pandas Preview is Cleaner)
print( dk.limit(5).toPandas() )

是否可以以这种方式处理 GridFS 数据?我想看到最小的例子。

有一种方法可以将 Scala 代码转换为 Pyspark。

  1. 从 https://mvnrepository.com/artifact/org.mongodb.mongo-hadoop/mongo-hadoop-core/2.0.2 下载mongo-hadoop-core.jar

  2. 运行 pyspark 与包括罐子:

SPARK_CLASSPATH=./path/to/mongo-hadoop-core.jar pyspark
  1. 和翻译的代码:
sc = SparkContext(conf=sparkConf)
mongo_conf = {
"mongo.input.uri": "mongodb://..."
"mongo.input.query": s"...mongo query here..."
}
rdd = sc.newAPIHadoopRDD("com.mongodb.hadoop.GridFSInputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.apache.hadoop.io.MapWritable", conf=conf)

我对keyClassvalueClass不是百分百确定,所以这是我用来编译此代码的来源:

  • https://github.com/mongodb/mongo-hadoop/wiki/Spark-Usage
  • https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext.newAPIHadoopFile
  • http://apache-spark-user-list.1001560.n3.nabble.com/unable-to-create-rdd-with-pyspark-newAPIHadoopRDD-td10358.html
  • 从MongoDB GridFS加载Spark 2.x数据帧

最新更新