我正在使用PHP构建一个应用程序,并将MongoDB作为数据库。数据库中的一个集合具有大量数据,即8GB数据。我对存储在MongoDB集合中的数据执行聚合操作,并相应地生成统计信息。但是处理大量数据需要很长的时间。因此,我选择Apache spark来处理存储在MongDB集合中的数据我已经配置了MongoDB spark连接器,并在python中执行了一个演示脚本,通过spark从mongo集合中获取数据。
以下是python代码片段
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
conf=SparkConf()
conf.set('spark.mongodb.input.uri','mongodb://[host]/db.collection')
conf.set('spark.mongodb.output.uri','mongodb://[host]/db.collection')
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").load()
df.printSchema()
df.registerTempTable("mycollection")
result_data=sqlContext.sql("SELECT * from mycollection limit 10")
result_data.show()
在上面的代码片段中,我使用pyspark.sql模块生成RDD。但是生成RDD需要从集合中读取所有数据,这需要很长的时间才能读取大量数据,而Apache Spark的工作原理相反。因此,建议我一个合适的解决方案,使用pyspark从Mongo集合中读取数据,并具有最佳性能,如果Apache spark中存在替代包,也可以与MongoDB通信。
不幸的是,与过滤器不同,limit子句不会传递回Mongo Spark连接器,因此它无法自动转换为聚合管道阶段。
然而,你总是可以提供自己的管道阶段,比如:
dfr = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource")
df = dfr.option("pipeline", "[{ $limit: 10 }]").load()