仿制药和Rx



我正在尝试将此程序转换为使用仿制药。但是,在线上

foreach (var observer in observerList)
{                           
   observer.OnNext(observer);
}

我得到:

Cannot convert from System.IObserver<T> to T

程序:

public sealed class EnumerableObservable<T> : IObservable<T>, IDisposable
{
    private readonly IEnumerable<T> enumerable;
    public EnumerableObservable(IEnumerable<T> enumerable)
    {
        this.enumerable = enumerable;
        this.cancellationSource = new CancellationTokenSource();
        this.cancellationToken = cancellationSource.Token;
        this.workerTask = Task.Factory.StartNew(() =>
        {
                foreach (var value in this.enumerable)
                {
                    //if task cancellation triggers, raise the proper exception
                    //to stop task execution
                    cancellationToken.ThrowIfCancellationRequested();
                    foreach (var observer in observerList)
                    {                           
                        observer.OnNext(observer);
                    }
                }
            }, this.cancellationToken);
    }
    //the cancellation token source for starting stopping
    //inner observable working thread
    private readonly CancellationTokenSource cancellationSource;
    //the cancellation flag
    private readonly CancellationToken cancellationToken;
    //the running task that runs the inner running thread
    private readonly Task workerTask;
    //the observer list
    private readonly List<IObserver<T>> observerList = new List<IObserver<T>>();
    public IDisposable Subscribe(IObserver<T> observer)
    {
        observerList.Add(observer);
        //subscription lifecycle missing
        //for readability purpose
        return null;
    }
    public void Dispose()
    {
        //trigger task cancellation
        //and wait for acknoledge
        if (!cancellationSource.IsCancellationRequested)
        {
            cancellationSource.Cancel();
            while (!workerTask.IsCanceled)
                Thread.Sleep(100);
        }
        cancellationSource.Dispose();
        workerTask.Dispose();
        foreach (var observer in observerList)
            observer.OnCompleted();
    }
}
public sealed class ConsoleStringObserver<T> : IObserver<T>
{
    public void OnCompleted()
    {
        Console.WriteLine("-> END");
    }
    public void OnError(Exception error)
    {
        Console.WriteLine("-> {0}", error.Message);
    }
    public void OnNext(T value)
    {
        Console.WriteLine("-> {0}", value.ToString());
    }
}
class Program
{
    static void Main(string[] args)
    {
        //we create a variable containing the enumerable
        //this does not trigger item retrieval
        //so the enumerator does not begin flowing datas
        var enumerable = EnumerateValuesFromSomewhere();
        using (var observable = new EnumerableObservable<string>(enumerable))
        using (var observer = observable.Subscribe(new ConsoleStringObserver<string>()))
        {
            //wait for 2 seconds than exit
            Thread.Sleep(2000);
        }
        Console.WriteLine("Press RETURN to EXIT");
        Console.ReadLine();
    }
    static IEnumerable<string> EnumerateValuesFromSomewhere()
    {
        var random = new Random(DateTime.Now.GetHashCode());
        while (true) //forever
        {
            //returns a random integer number as string
            yield return random.Next().ToString();
            //some throttling time
            Thread.Sleep(100);
        }
    }
}

应该说 observer.OnNext(value);

您的字段称为observerlist。这是iObserver的列表。

private readonly List<IObserver<T>> observerList = new List<IObserver<T>>();

您正在传递此循环中的所有元素,并尝试与参数本人传递。

foreach (var observer in observerList)
{                           
    observer.OnNext(observer);
}

observer.OnNext它预期而不是他自己。

例如,如果是IObserver<string>,则需要传递字符串

observer.OnNext("Hello world");

我无法理解您的程序,但是如果您告诉我可以帮助您。

相关内容

  • 没有找到相关文章

最新更新