使用多个线程使用Twitter4J从Twitter获取数据



我有一组关键字(超过600),我想使用流api与它们跟踪推文。Twitter API限制了您可以跟踪的关键字的数量,将其跟踪到200。因此,我决定使用几个线程来使用几个线程,并使用几个OAuth代币。这就是我的方式:

String[] dbKeywords = KeywordImpl.listKeywords();
    List<String[]> keywords = ditributeKeywords(dbKeywords);
    for (String[] subList : keywords) {
        StreamCrawler streamCrawler = new StreamCrawler();
        streamCrawler.setKeywords(subList);
        Thread crawlerThread = new Thread(streamCrawler);
        crawlerThread.start();
    }

这就是单词在线程之间分布的方式。每个线程收到不超过200个字。这是StreamCrawler的实现:

public class StreamCrawler extends Crawler implements Runnable {
...
    private String[] keywords;
    public void setKeywords(String[] keywords) {
    this.keywords = keywords;
}
@Override
public void run() {
    TwitterStream twitterStream = getTwitterInstance();
    StatusListener listener = new StatusListener() {
        ArrayDeque<Tweet> tweetbuffer = new ArrayDeque<Tweet>();
        ArrayDeque<TwitterUser> userbuffer = new ArrayDeque<TwitterUser>();

        @Override
        public void onException(Exception arg0) {
            System.out.println(arg0);
        }
        @Override
        public void onDeletionNotice(StatusDeletionNotice arg0) {
            System.out.println(arg0);
        }
        @Override
        public void onScrubGeo(long arg0, long arg1) {
            System.out.println(arg1);
        }
        @Override
        public void onStatus(Status status) {
                 ...Doing something with message
        }
        @Override
        public void onTrackLimitationNotice(int arg0) {
            System.out.println(arg0);
            try {
                Thread.sleep(5 * 60 * 1000);
                System.out.println("Will sleep for 5 minutes!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void onStallWarning(StallWarning arg0) {
            System.out.println(arg0);
        }
    };
    FilterQuery fq = new FilterQuery();
    String keywords[] = getKeywords();
    System.out.println(keywords.length);
    System.out.println("Listening for " + Arrays.toString(keywords));
    fq.track(keywords);
    twitterStream.addListener(listener);
    twitterStream.filter(fq);
}
private long getCurrentThreadId() {
    return Thread.currentThread().getId();
}
private TwitterStream getTwitterInstance() {
    TwitterConfiguration configuration = null;
    TwitterStream twitterStream = null;
    while (configuration == null) {
        configuration = TokenFactory.getAvailableToken();
        if (configuration != null) {
            System.out
                    .println("Token was obtained " + getCurrentThreadId());
            System.out.println(configuration.getTwitterAccount());
            setToken(configuration);
            ConfigurationBuilder cb = new ConfigurationBuilder();
            cb.setDebugEnabled(true);
            cb.setOAuthConsumerKey(configuration.getConsumerKey());
            cb.setOAuthConsumerSecret(configuration.getConsumerSecret());
            cb.setOAuthAccessToken(configuration.getAccessToken());
            cb.setOAuthAccessTokenSecret(configuration.getAccessSecret());
            twitterStream = new TwitterStreamFactory(cb.build())
                    .getInstance();
        } else {
            // If there is no available configuration, wait for 2 minutes
            // and try again
            try {
                System.out
                        .println("There were no available tokens, sleeping for 2 minutes.");
                Thread.sleep(2 * 60 * 1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    return twitterStream;
    }
}

所以我的问题是,当我启动时,例如2个线程,我会收到通知,他们俩都在打开流并获得它。但是实际上,只有首先是真正获得流和分别调用OnStatus方法。第二个线程中使用的数组不是空的。TwitterConfiguration也是有效且独特的。因此,我不明白这种行为的原因。为什么唯一的第一个线程返回推文?

据我所知,您试图从同一ip中同时连接到公共流端端点(又称常规流或stream.twitter.com)。
更具体地说,我认为您希望从同一IP中获得两个主动连接。

尽管Twitter流媒体文档并未清楚地说到与公共终点的一个站立连接,但Twitter员工在开发网站上澄清了这一点https://dev.twitter.com/discussions/7542

对于一般流,您只能从同一IP中建立一个连接。

这意味着您使用两个不同的Twitter应用程序/帐户连接到公共流并不重要。只要您从相同的IP地址连接,您就只能与公共流的一个站立连接。您说您都连接了这两个流,并且此行为的答案由Twitter员工给出:https://dev.twitter.com/discussions/14935

您可能会发现有时stream.twitter.com可以让您在这里或那里进行更多的开放连接,但不应依靠这种行为。

例如,如果您尝试在第二个线程中尝试连接到用户流(Twitter4J Twitterstream user()方法),那么您将真正开始同时获得过滤器&amp;用户流。

关于200个轨道关键字限制,twitter4j.org javadoc可能已经过时了。这是Twitter API文档所说的

默认访问级别最多允许400个轨道关键字,5,000个关注用户ID和25 0.1-360度位置框。如果您需要提高对流API的访问,则应探索我们Twitter数据的合作伙伴提供者...

因此,如果您需要超越400,则可能需要向Twitter询问提高Twitter帐户应用程序的轨道访问级别,或与Twitter数据的认证合作伙伴提供者一起工作。

您不一定需要的另一件事是启动新的线程以获取流,因为Twitter4J滤波器(或用户)"内部创建一个线程,该线程操纵TwitterStream并连续调用适当的侦听器方法"(从一个示例引用"YAMATO YUSUKE的代码)。

我希望这有帮助。(我无法发布更多链接,因为我得到了"您至少需要10个声誉来发布2个以上的链接")

最新更新