解决问题,请参阅post
的解决方案我需要帮助,以估算我的Tweepy程序使用位置过滤器调用Twitter流API的运行时间。
我踢开后,它已经运行了20多分钟,这比我预期的要长。我是Twitter Stream API的新手,并且仅与REST API合作了几天。在我看来,REST API将在几秒钟内给我50条推文,很容易。但是这个流请求需要更多的时间。我的程序尚未死于我或有任何错误。所以我不知道它是否有任何问题。如果是这样,请指出。
总而言之,如果您认为我的代码正确,您能否提供运行时间的估算值?如果您认为我的代码错了,您能帮我解决吗?
预先感谢您!
这是代码:
# Import Tweepy, sys, sleep, credentials.py
import tweepy, sys
from time import sleep
from credentials import *
# Access and authorize our Twitter credentials from credentials.py
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)
box = [-86.33,41.63,-86.20,41.74]
class CustomStreamListener(tweepy.StreamListener):
def on_error(self, status_code):
print >> sys.stderr, 'Encountered error with status code:', status_code
return True # Don't kill the stream
def on_timeout(self):
print >> sys.stderr, 'Timeout...'
return True # Don't kill the stream
stream = tweepy.streaming.Stream(auth, CustomStreamListener()).filter(locations=box).items(50)
stream
我尝试了该方法从http://docs.tweepy.org/en/v3.4.0/auth_tutorial.html#auth-tutorial显然对我不起作用...这是我的代码。您介意提供任何意见吗?让我知道您是否有一些工作代码。谢谢!
# Import Tweepy, sys, sleep, credentials.py
import tweepy, sys
from time import sleep
from credentials import *
# Access and authorize our Twitter credentials from credentials.py
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)
# Assign coordinates to the variable
box = [-74.0,40.73,-73.0,41.73]
import tweepy
#override tweepy.StreamListener to add logic to on_status
class MyStreamListener(tweepy.StreamListener):
def on_status(self, status):
print(status.text)
def on_error(self, status_code):
if status_code == 420:
#returning False in on_data disconnects the stream
return False
myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener())
myStream.filter(track=['python'], locations=(box), async=True)
这是错误消息:
Traceback (most recent call last):
File "test.py", line 26, in <module>
myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener())
TypeError: 'MyStreamListener' object is not callable
问题解决了!请参阅下面的解决方案
在另一轮调试之后,这是一个可能对同一主题感兴趣的人的解决方案:
# Import Tweepy, sys, sleep, credentials.py
try:
import json
except ImportError:
import simplejson as json
import tweepy, sys
from time import sleep
from credentials import *
# Access and authorize our Twitter credentials from credentials.py
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)
# Assign coordinates to the variable
box = [-74.0,40.73,-73.0,41.73]
import tweepy
#override tweepy.StreamListener to add logic to on_status
class MyStreamListener(tweepy.StreamListener):
def on_status(self, status):
print(status.text.encode('utf-8'))
def on_error(self, status_code):
if status_code == 420:
#returning False in on_data disconnects the stream
return False
myStreamListener = MyStreamListener()
myStream = tweepy.Stream(api.auth, listener=myStreamListener)
myStream.filter(track=['NYC'], locations=(box), async=True)
核心问题:我认为您误会了这里的流。
tl; dr:您的代码正在工作,您只是没有用返回的数据做任何事情。
REST API调用是一个单一的信息呼叫。您提出了一个请求,Twitter还会发送一些信息,这些信息将分配给您的变量。
the StreamObject(您已创建为stream
)从Tweepy开设了与您的搜索参数的连接,并且Twitter,wect twitter therets threats tweets to to top。永远。
来自Tweepy Docs:
流媒体API与REST API完全不同,因为 REST API用于从Twitter中获取数据,但流api 将消息推入持续的会话。这允许流动API 实时下载更多的数据比使用其余的数据 API。
因此,您需要构建一个处理程序(streamListener
,在Tweepy's术语中),就像打印出推文一样。
附加
警告词,从苦涩的经验中 - 如果您要尝试将推文保存到数据库中:Twitter可以并且会将对象传输到您要比将它们保存到数据库的速度要快得多。这将导致您的流断开连接,因为这些推文在Twitter上备份,并且在一定级别的后盾性(不是实际的短语)上,他们只会断开您的连接。
我通过使用django-rq
将作业保存到Jobqueue进行处理 - 这样,我可以每秒钟处理数百条推文(在峰值),并且可以平滑。您可以在下面看到我如何做到这一点。如果您不使用Django作为框架,则Python-RQ也将起作用。read both
方法只是从Tweet读取并将其保存到Postgres数据库的函数。在我的具体情况下,我使用django_rq.enqueue
函数通过Django Orm进行了此操作。
__author__ = 'iamwithnail'
from django.core.management.base import BaseCommand, CommandError
from django.db.utils import DataError
from harvester.tools import read_both
import django_rq
class Command(BaseCommand):
args = '<search_string search_string>'
help = "Opens a listener to the Twitter stream, and tracks the given string or list"
"of strings, saving them down to the DB as they are received."
def handle(self, *args, **options):
try:
import urllib3.contrib.pyopenssl
urllib3.contrib.pyopenssl.inject_into_urllib3()
except ImportError:
pass
consumer_key = '***'
consumer_secret = '****'
access_token='****'
access_token_secret_var='****'
import tweepy
import json
# This is the listener, responsible for receiving data
class StdOutListener(tweepy.StreamListener):
def on_data(self, data):
decoded = json.loads(data)
try:
if decoded['lang'] == 'en':
django_rq.enqueue(read_both, decoded)
else:
pass
except KeyError,e:
print "Error on Key", e
except DataError, e:
print "DataError", e
return True
def on_error(self, status):
print status
l = StdOutListener()
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret_var)
stream = tweepy.Stream(auth, l)
stream.filter(track=args)
编辑:您的后续问题是通过错误打电话给听众引起的。
myStreamListener = MyStreamListener() #creates an instance of your class
在哪里有:
myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener())
使用()
时,您正在尝试将侦听器称为函数。所以应该是:
myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener)
实际上,可能会更简洁地写成:
myStream = tweepy.Stream(api.auth,myStreamListener)