可观察到对不同线程上更改的阻塞集合没有反应



我有以下代码:

class Program
{
    static void Main(string[] args)
    {
        var watcher = new SNotifier(DumpToConsole);
        watcher.StartQueue();
        Console.ReadLine();
    }
    private static void DumpToConsole(IList<Timestamped<int>> currentCol)
    {
        Console.WriteLine("buffer time elapsed, current collection contents is: {0} items.", currentCol.Count);
        Console.WriteLine("holder has: {0}", currentCol.Count);
    }
}

SNotifier:

public class SNotifier
{
    private BlockingCollection<int> _holderQueue;
    private readonly Action<IList<Timestamped<int>>> _dumpAction;
    public SNotifier(Action<IList<Timestamped<int>>> dumpAction)
    {
        PopulateListWithStartValues();
        _dumpAction = dumpAction;
    }
    public void StartQueue()
    {
        PopulateQueueOnDiffThread();
        var observableCollection = _holderQueue.ToObservable();
        var myCollectionTimestamped = observableCollection.Timestamp();
        var bufferedTimestampedCollection = myCollectionTimestamped.Buffer(TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(3));
        using (bufferedTimestampedCollection.Subscribe(_dumpAction))
        {
            Console.WriteLine("started observing collection");
        }
    }
    private void PopulateQueueOnDiffThread()
    {
        Action addToCollectionAction = AddToCollection;
        var t = new TaskFactory();
        t.StartNew(addToCollectionAction);
    }
    private static IEnumerable<int> GetInitialElements()
    {
        var random = new Random();
        var items = new List<int>();
        for (int i = 0; i < 10; i++)
            items.Add(random.Next(1, 10));
        return items;
    }
    private void AddToCollection()
    {
        while (true)
        {
            var newElement = new Random().Next(1, 10);
            _holderQueue.Add(newElement);
            Console.WriteLine("added {0}", newElement);
            Console.WriteLine("holder has: {0}", _holderQueue.Count);
            Thread.Sleep(1000);
        }
    }
    private void PopulateListWithStartValues()
    {
        _holderQueue = new BlockingCollection<int>();
        var elements = GetInitialElements();
        foreach (var i in elements)
            _holderQueue.Add(i);
    }
}

我需要运行 DumpToConsole() 方法以每 3 秒显示一次集合计数,而此集合在另一个线程上更改其内容。我的问题是DumpToConsole()只被调用一次。为什么?!我已经花了一整天的时间在这上面。由于我已经订阅了我的转储方法到可观察量,它应该"观察"集合更改并每 3 秒重新调用一次 DumpToConsole() 方法;这就是我需要的。

想法?谢谢

(附言传递给 SNotifier 类的操作是我在 SNotifier 中删除控制台相关内容的方式,我需要更好地重构它,它可以忽略,因为它与问题本身无关)

您正在BlockingCollection<int>上呼叫ToObservable()。此扩展方法只是获取集合上的IEnumerable<int>接口并将其转换为IObservable<int>。这具有在订阅时获取集合内容列表并通过可观察流将其转储的效果。

它不会在添加项时继续枚举项。

ToObservable()前面使用GetConsumingEnumerable()可以解决这个问题。

但是,需要谨慎,因为这也会从集合中删除项目,这可能不是可取的。

如果这是可以接受的,您可能希望在多个订阅者的情况下发布生成的可观察结果,以避免造成严重破坏。

如果您只是添加,则可以考虑扭转整个过程 - 使用主题来支持"添加"方法,并让一个订阅者填写列表(或 BlockingCollection 如果需要)来跟踪集合,然后第二个订阅者可以报告进度。

另一种方法是使用 ObservableCollection 并订阅其事件。

在最后两个建议中,您需要使"添加"线程安全,因为Subject<T>ObservableCollection<T>本身都不是线程安全的。

补遗

布兰登评论您正在StartQueue处置订阅,这让我意识到另一个问题——StartQueue永远不会回来!这是因为在IEnumerableToObservable()转换上对Subscribe的调用在枚举完成之前不会返回 - 因此它也保留了处置(因为IDisposableSubscribe的返回值),这就是为什么我也没有注意到指出using @Brandon!

有了以上两点,您需要进行以下其他更改。首先,删除订阅周围的using语句,隐式处置将取消订阅。当我们解决阻塞订阅调用时,这将导致订阅立即被取消。如果确实需要在某个时候显式取消订阅,则应保留IDisposable句柄。

其次,在ToObservable()后立即添加对SubscribeOn(Scheduler.Default)的调用,以防止Subscribe呼叫阻塞。

相关内容

  • 没有找到相关文章

最新更新