如何在IObservable中等待一个具有特定属性值的对象



为了澄清,我有一个方法:

public static IObservable<Node> GetNodes()
    {            
        var computers = GetComputersInLan();
        return computers.Select(computerAddress => GetNode(computerAddress));
    }

GetComputersInLan方法返回IObservable的IPAddress

private static IObservable<IPAddress> GetComputersInLan()
    {
        var tasks = new List<Task<PingReply>>();
        for (int i = 1; i < 255; i++)
        {
            Ping p = new Ping();
            ipBytes[3] = (byte)(++ipBytes[3]);
            IPAddress address = new IPAddress(ipBytes);
            tasks.Add(p.SendPingAsync(address, 2000));
        }
        return tasks.ToObservable().Where(x => x.Result.Status == IPStatus.Success).Select(y => y.Result.Address);
    }

GetNode方法构造一个Node。

private static Node GetNode(IPAddress ipAddress)
    {
       return new Node(ipAddress, (IHandler)Activator.GetObject(typeof(Handler), "tcp://" + ipAddress + ":1337/handler"));
    }
public class Node
{
    private IHandler Handler { get; set; }
    public IPAddress Address { get; set; }
    public int AvailableCores { get; set; }
    public async Task<TResult> Invoke<TResult>(Func<TResult> method)
    {
        AvailableCores--;
        var result = await Task.Run<TResult>(() => Handler.Invoke(method));
        AvailableCores++;
        return result;
    }
}

Handler是一台远程计算机,AvailableCores代表它的cpu内核。

我想要的是等待方法GetNodes返回第一个节点,有超过0个可用性。

await GetNodes().FirstAsync(node => node.AvailableCore > 0)

但是发生的事情是,在对方法Invoke进行足够的调用之后,而不是等待内核可用,它会触发一个异常"序列不包含元素"。

这是该方法的预期行为。FirstAsync将只检查您传递给它的项目的当前状态,要么返回第一个匹配,要么抛出您遇到的异常,如果没有匹配。

您必须自己处理等待的情况,直到内核可用为止。您可以尝试FirstOrDefaultAsync返回null,而不是在所有内核都繁忙时抛出异常。在此基础上,您将需要一些方案来检测内核何时可用于下一个工作单元,无论是事件还是轮询。

相关内容

  • 没有找到相关文章