在 pyspark 流中展平来自 Dstream 的列表/数组



我正在使用带有火花流的 Kafka 主题,我需要计算数组中所有出现的值。 它类似于规范的字数统计示例,只是我的输入数据是字符串列表。 完全披露:我是所有火花的新手。

["#epstein", "#didnt", "#kill", "#himself"]
["#foo", "#didnt", "#bar"]

需要变成类似的东西

#epstein  1
#foo 1
#didnt 2
#kill 1
#himself 1
#bar 1

我可以走到这一步,从 kafka 消息的其余部分中提取主题标签数组,并可以将数组打印到控制台,但我无法弄清楚如何拆分/计数它。

zookeeper = '10.0.8.111:2181'
kafka_topic = 'twitter_short_json'
sc = SparkContext(appName="CountHashtags")
sc.setLogLevel("ERROR")
# sets the stream to run in 5 second increments
ssc = StreamingContext(sc, 5)
kafkaStream = KafkaUtils.createStream(ssc, zookeeper, 'streaming-group', {kafka_topic: 1})
# parse the Kafka stream as json, returns a DStream object
hashtagsDStream = kafkaStream.map(lambda x: x[1]) 
.map(lambda j: json.loads(j)) 
.map(lambda p: ((p['hashtags']),))
hashtagsDStream.pprint()

如果我有一个RDD,我可以像这样使用爆炸方法:

exploded = hashtagsDStream.withColumn("hashtags", explode(hashtagsDStream.hashtags))
exploded.registerTempTable('exploded_table')
sqlDF = sqlContext.sql('select count(*), hashtags from exploded_table group by hashtags order by 1 desc').show()

但是 DStream 没有 withColumn 方法,所以我被困在如何对数组中的实际主题标签值进行计数上。

DStreamRDD流。您可以调用hashtagsDStream.foreachRDD(rdd ... )并在那里写下您想对您将收到的每个rdd执行的操作。

最新更新