使用 Tweepy 收听流媒体和搜索推文.如何停止以前的搜索并仅侦听新流



我正在使用Flask和Tweepy来搜索实时推文。在前端,我有一个用户文本输入,以及名为"搜索"的按钮。理想情况下,当用户在输入中输入搜索词并单击"搜索"按钮时,Tweepy 应该侦听新的搜索词并停止以前的搜索词流。 单击"搜索"按钮时,它将执行以下功能:

@app.route('/search', methods=['POST'])
# gets search-keyword and starts stream
def streamTweets():
    search_term = request.form['tweet']
    search_term_hashtag = '#' + search_term
    # instantiate listener
    listener = StdOutListener()
    # stream object uses listener we instantiated above to listen for data
    stream = tweepy.Stream(auth, listener)
    if stream is not None:
        print "Stream disconnected..."
        stream.disconnect()
    stream.filter(track=[search_term or search_term_hashtag], async=True)
    redirect('/stream') # execute '/stream' sse
    return render_template('index.html')

在上述代码的倒数第二行中执行的/stream路由如下所示:

@app.route('/stream')
def stream():
    # we will use Pub/Sub process to send real-time tweets to client
    def event_stream():
        # instantiate pubsub
        pubsub = red.pubsub()
        # subscribe to tweet_stream channel
        pubsub.subscribe('tweet_stream')
        # initiate server-sent events on messages pushed to channel
        for message in pubsub.listen():
            yield 'data: %snn' % message['data']
    return Response(stream_with_context(event_stream()), mimetype="text/event-stream")

我的代码工作正常,从某种意义上说,每当单击"搜索"按钮时,它都会启动一个新流并搜索给定的术语,但它不会停止之前的搜索。例如,如果我的第一个搜索词是"NYC",然后我想搜索另一个词,比如"洛杉矶",它会给我"NYC"和"洛杉矶"的结果,这不是我想要的。我只想搜索"洛杉矶"。我该如何解决这个问题?换句话说,如何停止上一个流?我查看了以前的其他线程,我知道我必须使用 stream.disconnect() ,但我不确定如何在我的代码中实现这一点。任何帮助或意见将不胜感激。非常感谢!!

下面是

一些代码,可以在创建新流时取消旧流。它的工作原理是将新流添加到全局列表,然后在创建新流时对列表中的所有流调用stream.disconnect()

diff --git a/app.py b/app.py
index 1e3ed10..f416ddc 100755
--- a/app.py
+++ b/app.py
@@ -23,6 +23,8 @@ auth.set_access_token(access_token, access_token_secret)
 app = Flask(__name__)
 red = redis.StrictRedis()
+# Add a place to keep track of current streams
+streams = []
 @app.route('/')
 def index():
@@ -32,12 +34,18 @@ def index():
 @app.route('/search', methods=['POST'])
 # gets search-keyword and starts stream
 def streamTweets():
+        # cancel old streams
+        for stream in streams:
+            stream.disconnect()
+
        search_term = request.form['tweet']
        search_term_hashtag = '#' + search_term
        # instantiate listener
        listener = StdOutListener()
        # stream object uses listener we instantiated above to listen for data
        stream = tweepy.Stream(auth, listener)
+        # add this stream to the global list
+        streams.append(stream)
        stream.filter(track=[search_term or search_term_hashtag],
                async=True) # make sure stream is non-blocking
        redirect('/stream') # execute '/stream' sse

这没有解决的是会话管理问题。使用当前设置,一个用户的搜索将影响所有用户的搜索。可以通过为用户提供一些标识符并将其流与其标识符一起存储来避免这种情况。最简单的方法可能是使用 Flask 的会话支持。你也可以按照皮埃尔的建议,用requestId来做到这一点。无论哪种情况,您都需要代码来通知用户何时关闭页面并关闭其流。

免责声明:我对Tweepy一无所知,但这似乎是一个设计问题。

您是否正在尝试将状态添加到 RESTful API?您可能遇到设计问题。正如 JRichardSnape 回答的那样,您的 API 不应该负责取消请求;它应该在前端完成。我这里的意思是在javascript/AJAX/等中调用此函数,为新函数添加另一个调用

@app.route('/cancelSearch', methods=['POST'])使用具有搜索词的"POST"。只要你没有状态,你就无法在异步调用中安全地做到这一点:想象一下,其他人同时进行相同的搜索,然后取消一个将取消两个(记住,你没有状态,所以你不知道你要取消谁(。也许你确实需要你的设计状态。

如果您必须继续使用它并且不介意打破"无状态"规则,请在您的请求中添加"状态"。在这种情况下,它并没有那么糟糕,因为您可以启动一个线程并使用 userId 命名它,然后在每次新搜索时杀死该线程

def streamTweets():
    search_term = request.form['tweet']
    userId = request.form['userId'] # If your limit is one request per user at a time. If multiple windows can be opened and you want to follow this limit, store userId in a cookie.
    #Look for any request currently running with this ID, and cancel them

或者,你可以返回一个requestId,然后你将其保留在前端可以调用cancelSearch?requestId=$requestId。在cancelSearch中,您必须找到待处理的请求(听起来像是tweepy,因为您没有使用自己的线程(并断开连接。

出于好奇,我只是观察了当你在谷歌上搜索时会发生什么,它使用了一个GET请求。看一看(调试工具 -> 网络;然后输入一些文本并查看自动填充(。谷歌使用随每个请求发送的令牌(每次你输入内容时((。这并不意味着它用于此目的,但这基本上就是我所描述的。如果您不需要会话,请使用唯一标识符

好吧,

我使用计时器方法解决了它 但我仍然在寻找pythonic的方式。

from streamer import StreamListener
def stream():
    hashtag = input
    #assign each user an ID ( for pubsub )
    StreamListener.userid = random_user_id
    def handler(signum, frame):
        print("Forever is over")
        raise Exception("end of time")
    def main_stream():
        stream = tweepy.Stream(auth, StreamListener())
        stream.filter(track=track,async=True)
        redirect(url_for('map_stream'))
    def close_stream():
        # this is for closing client list in redis but don't know it's working
        obj = redis.client_list(tweet_stream)
        redis_client_list = obj[0]['addr']
        redis.client_kill(redis_client_list)
        stream = tweepy.Stream(auth, StreamListener())
        stream.disconnect()
    import signal
    signal.signal(signal.SIGALRM, handler)
    signal.alarm(300)
    try:
        main_stream()
    except Exception:
        close_stream()
        print("function terminate")

最新更新