Rx Twitter Stream在进入第三个搜索主题时停止



我试图从这个视频中重建Jonathan Worthington的Twitter示例:https://www.youtube.com/watch?v=_VdIQTtRkb8

它开始工作正常。但过了一会儿,流停止了。在输入的第三个搜索主题中,应用程序不再收到任何推文。我不知道为什么。它似乎不依赖于时间。因为如果我让它运行而不更改搜索主题,它就会继续前进。谁能帮忙?

这是主窗口的代码:

public ObservableCollection<string> PositiveTweets = new ObservableCollection<string>();
public ObservableCollection<string> NegativeTweets = new ObservableCollection<string>();
public MainWindow()
{
    InitializeComponent();
    FocusManager.SetFocusedElement(this, SearchTextBox);
    PositiveListBox.ItemsSource = PositiveTweets;
    NegativeListBox.ItemsSource = NegativeTweets;
    var keywords = Keywords.Create(new List<string> {"cool"}, new List<string> {"sucks"});
    var sa = new SentimentAnalysis(keywords);
    var topics = Observable
        .FromEventPattern<TextChangedEventArgs>(SearchTextBox, "TextChanged")
        .Select(e => ((TextBox) e.Sender).Text)
        .Throttle(TimeSpan.FromSeconds(1));
    var tweets = topics
        .Select(Twitter.AllTweetsAbout)
        .Switch()
        .Select(sa.Score)
        .Publish();
    tweets.Connect();
    var format = new Func<string, ScoredTweet, string>((topic, st) => String.Format("[user: @{0} | topic: {1} | score: {3}]rn{2}rn", st.Tweet.User, topic, st.Tweet.Text, st.Score));
    var addToList = new Action<string, ObservableCollection<string>>((item, list) =>
    {
        if (list.Count == 4)
            list.RemoveAt(3);
        list.Insert(0, item);
    });
    tweets
        .Where(x => x.Score >= 0)
        .Sample(TimeSpan.FromSeconds(1))
        .ObserveOnDispatcher()
        .Subscribe(x => addToList(format(x.Tweet.Topic, x), PositiveTweets));
    tweets
        .Where(x => x.Score < 0)
        .Sample(TimeSpan.FromSeconds(1))
        .ObserveOnDispatcher()
        .Subscribe(x => addToList(format(x.Tweet.Topic, x), NegativeTweets));
}

这是 XAML 代码:

<StackPanel Margin="10">
    <DockPanel>
        <Label DockPanel.Dock="Left" Content="Search:" Margin="0,0,10,0" FontSize="20"/>
        <TextBox Name="SearchTextBox" FontSize="20" Focusable="True"/>
    </DockPanel>
    <Label Content="positive" FontSize="20"/>
    <ListBox Name="PositiveListBox" Height="250" FontSize="16"/>
    <Label Content="negative" FontSize="20"/>
    <ListBox Name="NegativeListBox" Height="250" FontSize="16"/>
</StackPanel>

IObservable的创建方式如下:

readonly static SingleUserAuthorizer Auth = new SingleUserAuthorizer
{
    CredentialStore = new InMemoryCredentialStore
    {
        ConsumerKey = ConfigurationManager.AppSettings["consumerKey"],
        ConsumerSecret = ConfigurationManager.AppSettings["consumerSecret"],
        OAuthToken = ConfigurationManager.AppSettings["authtoken"],
        OAuthTokenSecret = ConfigurationManager.AppSettings["authtokensecret"],
    }
};
public static IObservable<Tweet> AllTweetsAbout(string topic)
{
    return Observable.Create<Tweet>(o =>
    {
        var twitterCtx = new TwitterContext(Auth);
        var query = from s in twitterCtx.Streaming
            where s.Type == StreamingType.Filter &&
                    s.Track == topic
            select s;
        var disposed = false;
        query.StartAsync(s =>
        {
            if(disposed)
                s.CloseStream();
            else
                o.OnNext(Tweet.Parse(s.Content, topic));
            return Task.FromResult(true);
        });
        return Disposable.Create(() => disposed = true);
    });
}

最后是情绪分析:

public class ScoredTweet
{
    public Tweet Tweet { get; set; }
    public int Score { get; set; }
}
public class SentimentAnalysis
{
    private readonly Keywords _keywords;
    public SentimentAnalysis(Keywords keywords)
    {
        _keywords = keywords;
    }
    public ScoredTweet Score(Tweet tweet)
    {
        return new ScoredTweet
        {
            Tweet = tweet,
            Score = _keywords.Positive.Count(x => tweet.Text.Contains(x)) - _keywords.Negative.Count(x => tweet.Text.Contains(x))
        };
    }
}
public class Keywords
{
    public List<string> Positive { get; private set; }
    public List<string> Negative { get; private set; }
    public static Keywords Create(List<string> positive, List<string> negative)
    {
       return new Keywords
        {
            Positive = positive,
            Negative = negative
        }; 
    }
}

我最近在Twitter上添加了对LINQ的Rx支持。也许这会帮助你:

    static async Task DoRxObservableStreamAsync(TwitterContext twitterCtx)
    {
        Console.WriteLine("nStreamed Content: n");
        int count = 0;
        var cancelTokenSrc = new CancellationTokenSource();
        try
        {
            var observable =
                await
                    (from strm in twitterCtx.Streaming
                                            .WithCancellation(cancelTokenSrc.Token)
                     where strm.Type == StreamingType.Filter &&
                           strm.Track == "twitter"
                     select strm)
                    .ToObservableAsync();
            observable.Subscribe(
                strm =>
                {
                    HandleStreamResponse(strm);
                    if (count++ >= 5)
                        cancelTokenSrc.Cancel();
                },
                ex => Console.WriteLine(ex.ToString()),
                () => Console.WriteLine("Completed"));
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Stream cancelled.");
        }
    }

您也可以下载此演示的源代码。

我找到了问题的解决方案。Twitter 类必须不是静态的,并且只能在创建 Twitter 类时创建 Twitter 上下文。

在我发布的代码中,AllTweetsAbout 方法是静态的,每次调用该方法时都会创建 Twitter 上下文。这不起作用,可能是因为它在给定时间或类似时间发生许多登录操作时被 Twitter API 以某种方式阻止。

相关内容

  • 没有找到相关文章

最新更新