与Python中的Kafka生产商一起发送数据的问题(Jupyter Notebook)



我正在尝试使用Kafka,Python和Twitter创建大数据分析。我有一个数据流的推文,我只采用它们的主题标签。我的问题与生产商Kafka一起在Python中使用。我无法将我想要的数据发送到我创建的主题中,因为我看不到与生产者一起发送变量的内容的任何选项。

在https://kafka-python.readthedocs.io/en/master/usage.html中,我只能看到使用b'some_string'发送精确字符串的选项。但是我想发送我从Twitter流中获取的标签。我对Python了解不多,所以请原谅我是否明显解决方案。

导入:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
import tweepy
from tweepy import OAuthHandler
from tweepy import Stream
import kafka
from kafka import SimpleProducer, KafkaClient
from kafka import KafkaProducer

流上下文:

ssc = StreamingContext(sc,60)

键:

consumer_key="consumer_key"
consumer_secret="consumer_secret"
access_token="access_token"
access_token_secret="access_token_secret"

tweepy:

auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)

生产者:

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

代码:

class MyStreamListener(tweepy.StreamListener):
    def on_status(self, status):
        for hashtag in status.entities['hashtags']:
            prueba = b'hashtag["text"]'
            producer.send('topic', prueba)
            return True
    def on_error(self, status_code):
        if status_code == 420:
            #returning False in on_data disconnects the stream
            return False

sparlistener:

myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth = api.auth, listener=MyStreamListener())

推文流:

myStream.filter(track=['some_text'])

事实是,生产者仅发送prueba的文字字符串,即"(hashtag["text"])"。我不是要发送确切的内容,而是要发送内容。

预先感谢。

producer.send('topic', hashtag)怎么样?您还需要确保将数据编码为raw字节,这就是Kafka存储的内容。如果标签是一个简单的字符串,则可以执行producer.send('topic', hashtag.encode('utf-8'))。如果是dict或更复杂的数据结构,则可能需要在编码字节之前使用json.dumps。希望这会有所帮助!

最新更新