使用Spark和kafka的Twitter流媒体:如何在MongoDB中存储数据



我正在使用这个python代码收集Twitter流数据https://github.com/sridharswamy/Twitter-Sentiment-Analysis-Using-Spark-Streaming-And-Kafka/blob/master/app.py

之后,我运行此代码来创建流上下文并将数据存储在MongoDB中。

def main():
  conf = SparkConf().setMaster("local[2]").setAppName("Streamer")
  sc = SparkContext(conf=conf)
  ssc = StreamingContext(sc, 10)
  ssc.checkpoint("checkpoint")   
  kstream = KafkaUtils.createDirectStream(
  ssc, topics = ['topic1'], kafkaParams = {"metadata.broker.list": 
  'localhost:9092'})
  tweets = kstream.map(lambda x: x[1].encode("ascii", "ignore"))
  #................insert in MonGODB.........................
  db.mynewcollection.insert_one(tweets)
  ssc.start()
  ssc.awaitTerminationOrTimeout(100)
  ssc.stop(stopGraceFully = True)
if __name__=="__main__":
  urllib3.contrib.pyopenssl.inject_into_urllib3()
  connection = pymongo.MongoClient('....',...)
  db = connection['twitter1']
  db.authenticate('..','...')
  main()

但是我收到此错误:

TypeError: document must be an instance of dict, bson.son.SON, bson.raw_bson.RawBSONDocument, or a type that inherits from collections.MutableMapping

我还尝试使用"foreachRDD"并创建函数"save">

tweets.foreachRDD(Save)

我将"插入"移到了这个函数

def Save(rdd):
if not rdd.isEmpty():
    db.mynewcollection.insert_one(rdd)

但它不起作用

TypeError: can't pickle _thread.lock objects

谁能帮助我知道如何在MongoDB中存储流数据

  • 发生第一个错误是因为您将分布式对象传递到 db.mynewcollection.insert_one 中。

  • 发生第二个错误是因为您在驱动程序上启动数据库连接,并且通常无法序列化连接对象。

虽然存在许多Spark/MongoDB连接器,但您应该查看(让Spark,Python和MongoDB协同工作(,但通用模式是使用foreachPartition。定义帮助程序

def insert_partition(xs):
    connection = pymongo.MongoClient('....',...)
    db = connection['twitter1']
    db.authenticate('..','...')
    db.mynewcollection.insert_many(xs)

然后:

def to_dict(s):
    return ... # Convert input to a format acceptable by `insert_many`, for example with json.loads
tweets.map(to_dict) 
    .foreachRDD(lambda rdd: rdd.foreachPartition(insert_partition))

最新更新