学习python和线程.我认为我的代码可以无限运行.帮我找虫子



我现在已经开始学习python了,我绝对爱上它了。

我正在构建一个小规模的facebook数据刮板。基本上,它将使用Graph API并抓取指定数量的用户的名字。它在一个线程中工作得很好(或者没有线程,我猜)。

我使用在线教程得出以下多线程版本(更新代码):

import requests
import json
import time
import threading
import Queue
GraphURL = 'http://graph.facebook.com/'
first_names = {} # will store first names and their counts
queue = Queue.Queue()
def getOneUser(url):
    http_response = requests.get(url) # open the request URL
    if http_response.status_code == 200:
        data = http_response.text.encode('utf-8', 'ignore') # Get the text of response, and encode it
        json_obj = json.loads(data) # load it as a json object
        # name = json_obj['name']
        return json_obj['first_name']
        # last = json_obj['last_name']
    return None
class ThreadGet(threading.Thread):
    """ Threaded name scraper """
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
    def run(self):
        while True:
            #print 'thread startedn'
            url = GraphURL + str(self.queue.get())
            first = getOneUser(url) # get one user's first name
            if first is not None:
                if first_names.has_key(first): # if name has been encountered before
                    first_names[first] = first_names[first] + 1 # increment the count
                else:
                    first_names[first] = 1 # add the new name
            self.queue.task_done()
            #print 'thread endedn'
def main():
    start = time.time()
    for i in range(6):
        t = ThreadGet(queue)
        t.setDaemon(True)
        t.start()
    for i in range(100):
        queue.put(i)
    queue.join()
    for name in first_names.keys():
        print name + ': ' + str(first_names[name])
    print '----------------------------------------------------------------'
    print '================================================================'
    # Print top first names
    for key in first_names.keys():
        if first_names[key] > 2:
            print key + ': ' + str(first_names[key])
    print 'It took ' + str(time.time()-start) + 's'
main()

老实说,我不明白代码的某些部分,但我明白了主要思想。输出是空的。我的意思是shell里面什么都没有,所以我相信它会继续运行。

所以我要做的是用整数填充queue,这些整数是fb上的用户id。然后使用每个ID来构建api调用URL。getOneUser每次返回一个用户的名称。该task (ID)被标记为"完成",并继续前进。

上面的代码有什么问题?

您使用的first_names不是线程安全的。可以添加锁来保护增量。否则代码应该可以工作。你可能达到了一些facebook api限制,也就是说,你应该限制你的请求率。

可以通过使用线程池和计算主线程中的名称来简化代码:

#!/usr/bin/env python
import json
import urllib2
from collections import Counter
from multiprocessing.dummy import Pool # use threads
def get_name(url):
    try:
        return json.load(urllib2.urlopen(url))['first_name']
    except Exception:
        return None # error
urls = ('http://graph.facebook.com/%d' % i for i in xrange(100))
p = Pool(5) # 5 concurrent connections
first_names = Counter(p.imap_unordered(get_name, urls))
print first_names.most_common()

要查看得到的错误,可以添加logging:

#!/usr/bin/env python
import json
import logging
import urllib2
from collections import Counter
from multiprocessing.dummy import Pool # use threads
logging.basicConfig(level=logging.DEBUG,
                    format="%(asctime)s %(threadName)s %(message)s")
def get_name(url):
    try:
        name = json.load(urllib2.urlopen(url))['first_name']
    except Exception as e:
        logging.debug('error: %s url: %s', e, url)
        return None # error
    else:
        logging.debug('done url: %s', url)
        return name
urls = ('http://graph.facebook.com/%d' % i for i in xrange(100))
p = Pool(5) # 5 concurrent connections
first_names = Counter(p.imap_unordered(get_name, urls))
print first_names.most_common()

限制每个给定时间段的请求数的一个简单方法是使用一个信号量:

#!/usr/bin/env python
import json
import logging
import time
import urllib2
from collections import Counter
from multiprocessing.dummy import Pool # use threads
from threading import _BoundedSemaphore as BoundedSemaphore, Timer
logging.basicConfig(level=logging.DEBUG,
                    format="%(asctime)s %(threadName)s %(message)s")
class RatedSemaphore(BoundedSemaphore):
    """Limit to 1 request per `period / value` seconds (over long run)."""
    def __init__(self, value=1, period=1):
        BoundedSemaphore.__init__(self, value)
        t = Timer(period, self._add_token_loop,
                  kwargs=dict(time_delta=float(period) / value))
        t.daemon = True
        t.start()
    def _add_token_loop(self, time_delta):
        """Add token every time_delta seconds."""
        while True:
            try:
                BoundedSemaphore.release(self)
            except ValueError: # ignore if already max possible value
                pass
            time.sleep(time_delta) # ignore EINTR
    def release(self):
        pass # do nothing (only time-based release() is allowed)
def get_name(gid, rate_limit=RatedSemaphore(value=100, period=600)):
    url = 'http://graph.facebook.com/%d' % gid
    try:
        with rate_limit:
            name = json.load(urllib2.urlopen(url))['first_name']
    except Exception as e:
        logging.debug('error: %s url: %s', e, url)
        return None # error
    else:
        logging.debug('done url: %s', url)
        return name
p = Pool(5) # 5 concurrent connections
first_names = Counter(p.imap_unordered(get_name, xrange(200)))
print first_names.most_common()

在初始突发之后,它应该每6秒发出一个请求。

原来的run函数只处理队列中的一个项目。你总共只从队列中删除了5个条目。

通常run函数看起来像

run(self):
    while True:
         doUsefulWork()

。它们有一个循环,导致重复的工作被完成。

[Edit] OP编辑代码以包含此更改。

其他一些有用的东西可以尝试:

  • run函数中加入print语句,你会发现它只被调用了5次。
  • 删除queue.join()调用,这是导致模块阻塞的原因,然后您将能够探测队列的状态。
  • run的整个主体放入函数中。验证您可以以单线程方式使用该函数来获得所需的结果,然后
  • 尝试只使用一个工作线程,然后最后转到
  • 多个工作线程

最新更新