RX UDPClient,从端口列表侦听来自设备的消息,并订阅要发布到Azure事件中心的数据



我有一些代码,它滚动通过端口列表,并设置一个包含可订阅的UDP缓冲区内容的流。

我是RX的新手,这是在回应另一个stackoverflow问题的帮助下构建的。

这个代码在早期测试中似乎还可以,并处理了我们可以发送的所有数据包,但现在它处于浸泡测试中,订阅似乎在一段时间后失败,它停止向事件中心发送事件。

我特别担心订阅代码中有一个异常,导致我丢失了信息,但没有被捕获。

这是代码,任何关于简化或改进的意见或建议都将不胜感激:

var receiveStream =
    _deviceTypeProvider.GetDeviceTypes().ToObservable().Trace("GetDeviceTypes")
        .SelectMany(devicePortMapping => Observable
            .Using(() => UdpListener(devicePortMapping),
                client =>
                    Observable
                        .FromAsync(client.ReceiveAsync)
                        .Repeat()).Trace("UdpListener"),
            (devicePortMapping, stream) =>
            {
                Log
                    .ForContext("Raw", stream.Buffer.ToPrintByteArray())
                    .Verbose("Received Incoming {DeviceType} Message from {Device} on Port {Port}",
                        devicePortMapping.DeviceType, stream.RemoteEndPoint.Address, devicePortMapping.Port);
                try
                {
                    var timeZoneOffset =
                        _deviceTimeZoneOffsetProvider.GetDeviceTimeZoneOffset(devicePortMapping.Port);
                    var tenant = _deviceTenantProvider.GetDeviceTenant(devicePortMapping.Port);
                    if (tenant == null || timeZoneOffset == null)
                    {
                        Log
                            .Error(
                                "Tenant or TimeOffset Missing for Port: {Port}, cannot continue processing this message",
                                devicePortMapping.Port);
                        return null;
                    }
                    var message =
                        new DeviceMessage(new Device(stream.RemoteEndPoint.Address.ToIPAddressNum(),
                            stream.RemoteEndPoint.Port, devicePortMapping.DeviceType, tenant.TenantId,
                            timeZoneOffset.Offset))
                        {
                            MessageUid = Guid.NewGuid(),
                            Timestamp = DateTime.UtcNow,
                            Raw = stream.Buffer,
                        };
                    message.Information("Received Incoming Message");
                    return message;
                }
                catch (Exception ex)
                {
                    Log.Error(ex, "Exception whilst receiving incoming message");
                    throw;
                }
            }).Trace("SelectMany").Select(Task.FromResult).Trace("Select");
if (_settings.TestModeEnabled)
{
    Log
        .Warning("Test Mode is Enabled");
    receiveStream = receiveStream
        .Select(async message =>
            await _testModeProvider.InjectTestModeAsync(await message)).Trace("TestMode");
}
_listener = receiveStream.Subscribe(async messageTask =>
{
    var message = await messageTask;
    if (message == null)
    {
        Log
            .Warning("Message is null, returning");
        return;
    }
    Log
        .ForContext("Raw", message.Raw.ToPrintByteArray(), true)
        .ForContext("Device", message.Device, true)
        .Verbose("Publishing Message {MessageUid} from {@Device}", message.MessageUid, message.Device);
    await _messagePublisher.Publish(message).ConfigureAwait(false);
}, error => { Log.Error(error, "Exception whilst publishing message"); });

以下是InjectTestMode方法:

 public async Task<DeviceMessage> InjectTestModeAsync(DeviceMessage deviceMessage)
    {
        try
        {
            var imei = GetImei(deviceMessage.Raw);
            if (string.IsNullOrEmpty(imei))
            {
                Log
                    .ForContext("DeviceMessage",deviceMessage,true)
                    .Error("Error while getting IMEI value from message raw data in Test Mode");
                return null;
            }
            //var dummyIpAddress = DummyIPfromIMEI(imei).ToIPAddressNum();
            var mapping = await _mappingService.GetIPMappingAsync(deviceMessage.Device.IPAddress);
            if (mapping == null)
            {
                Log
                 .ForContext("DeviceMessage", deviceMessage, true)
                 .Warning("Test Mode updated IP Address mapping with IPAddress: {IPAddress} for IMEI: {IMEI}", deviceMessage.Device.IPAddress.ToIPAddressString(), imei);
                await _mappingService.UpdateIPMappingAsync(deviceMessage.Device.IPAddress, imei);
            }

            // deviceMessage.Device.IPAddress = dummyIpAddress;
            return deviceMessage;
        }
        catch (Exception ex)
        {
            Log
                .ForContext("DeviceMessage",deviceMessage,true)
                .Error("Exception raised whilst injecting Test Mode", ex);
            return null;
        }
    }

这是UdpListener方法:

private UdpClient UdpListener(DeviceTypeMap deviceTypeMap)
    {
        Log.Information("Listening for Device Type: {DeviceType} messages on Port: {Port}", deviceTypeMap.DeviceType,
            deviceTypeMap.Port);
        var udpClient = new UdpClient();
        var serverEndpoint = new IPEndPoint(IPAddress.Any, deviceTypeMap.Port);
        udpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
        udpClient.Client.Bind(serverEndpoint);
        return udpClient;
    }

[更新]-2015年11月16日

所以我做了一些研究,似乎我有不止一个代码的味道,所以我更新了代码,它现在正在运行,但我想我会分享这个代码和修改后的代码的意图,看看是否有人能提出一个更优雅的解决方案。

意图

监听几个UDP端口,并将流量连同一些基于接收端口的元数据一起发送到Azure事件中心。

此元将取决于系统是否处于"测试模式"。

的实施

对于每个端口和设备类型,从ReceiveAsync事件创建一个UDPListener和Observable Collection,将这些Observable Collections组合成一个集合,然后可以从将数据发布到EventHubs的组件订阅该集合。

问题

InjectMode是异步的,如果出现问题,它可能会返回null,这似乎会扼杀序列。我认为这可能应该是某种可观察的扩展方法,它允许从序列中修改或删除设备消息,但我无法理解这一点。

最初,EventHub的发布是在订阅中进行的,直到我读到订阅中不应该使用异步,因为它会生成一个坏的async void。所有的研究似乎都指向将其推入SelectMany,这没有任何意义,因为这是观察到的序列的目的地,而不是过程的一部分,但我同意了。订阅实际上变得多余了。

我不确定是否需要所有的试接球盖帽,但我确信我有一个问题,打乱了顺序。正如Enigmativity所指出的,我捕获了所有异常,然后记录并重新抛出,这些日志条目中从未出现任何内容。

Retry().Do()感觉不太好,我可以像许多其他帖子中建议的那样让SelectMany()工作,所以我别无选择。

这是现在正在运行的代码:

public void Start()
{
    Log.Information("InboundUdpListener is starting");
    var receiveStream =
        _deviceTypeProvider.GetDeviceTypes().ToObservable().Trace("GetDeviceTypes")
            .SelectMany(devicePortMapping => Observable
                .Using(() => UdpListener(devicePortMapping),
                    client =>
                        Observable
                            .FromAsync(client.ReceiveAsync)
                            .Repeat()).Trace("UdpListener"),
                (devicePortMapping, stream) =>
                {
                    Log
                        .ForContext("Raw", stream.Buffer.ToPrintByteArray())
                        .Verbose("Received Incoming {DeviceType} Message from {Device} on Port {Port}",
                            devicePortMapping.DeviceType, stream.RemoteEndPoint.Address, devicePortMapping.Port);
                    try
                    {
                        var timeZoneOffset =
                            _deviceTimeZoneOffsetProvider.GetDeviceTimeZoneOffset(devicePortMapping.Port);
                        var tenant = _deviceTenantProvider.GetDeviceTenant(devicePortMapping.Port);
                        if (tenant == null || timeZoneOffset == null)
                        {
                            Log
                                .Error(
                                    "Tenant or TimeOffset Missing for Port: {Port}, cannot continue processing this message",
                                    devicePortMapping.Port);
                            return null;
                        }
                        var message =
                            new DeviceMessage(new Device(stream.RemoteEndPoint.Address.ToIPAddressNum(),
                                stream.RemoteEndPoint.Port, devicePortMapping.DeviceType, tenant.TenantId,
                                timeZoneOffset.Offset))
                            {
                                MessageUid = Guid.NewGuid(),
                                Timestamp = DateTime.UtcNow,
                                Raw = stream.Buffer,
                            };
                        message.Information("Received Incoming Message");
                        return message;
                    }
                    catch (Exception ex)
                    {
                        Log.Error(ex, "Exception whilst receiving incoming message");
                        throw;
                    }
                }).Trace("SelectMany");
    receiveStream = receiveStream.Retry().Do(async message =>
    {
        try
        {
            if (_testModeEnabled && message != null)
            {
                message = await _testModeProvider.InjectTestModeAsync(message);
            }
            if (message != null)
            {
                await _messagePublisher.Publish(message);
            }
        }
        catch (Exception ex)
        {
            Log.Error(ex, "Exception whilst publishing incoming message");
            throw;
        }
    }).Trace("Publish");
    _listener = receiveStream.Retry().Subscribe(OnMessageReceive, OnError, OnComplete);
    Log.Information("InboundUdpListener is started");
}

有人能看到这个代码的任何问题或提出任何改进建议吗。我真的很感谢你的帮助。

[李评论后更新]

我完全同意这是一场混乱,为了表明我愿意接受人们的帮助,这是我的下一次尝试

    public void Start()
    {
        _listener = _deviceTypeProvider.GetDeviceTypes().ToObservable()
            .SelectMany(CreateUdpListener, CreateMessage)
            .SelectMany(InjectTestMode)
            .SelectMany(PublishMessage)
            .Retry()
            .Subscribe(OnMessageReceive, OnError, OnComplete);
    }
    private IObservable<UdpReceiveResult> CreateUdpListener(DeviceTypeMap deviceType)
    {
        return Observable.Using(() => UdpListener(deviceType),
            client => Observable.FromAsync(client.ReceiveAsync).Repeat());
    }
    private DeviceMessage CreateMessage(DeviceTypeMap deviceTypeMap, UdpReceiveResult receiveResult)
    {
        var timeZoneOffset =
            _deviceTimeZoneOffsetProvider.GetDeviceTimeZoneOffset(deviceTypeMap.Port);
        var tenant = _deviceTenantProvider.GetDeviceTenant(deviceTypeMap.Port);
        if (tenant == null || timeZoneOffset == null)
        {
            Log
                .Error(
                    "Tenant or TimeOffset Missing for Port: {Port}, cannot continue processing this message",
                    deviceTypeMap.Port);
            return null;
        }
        var message =
            new DeviceMessage(new Device(receiveResult.RemoteEndPoint.Address.ToIPAddressNum(),
                receiveResult.RemoteEndPoint.Port, deviceTypeMap.DeviceType, tenant.TenantId,
                timeZoneOffset.Offset))
            {
                MessageUid = Guid.NewGuid(),
                Timestamp = DateTime.UtcNow,
                Raw = receiveResult.Buffer,
            };
        message.Information("Received Incoming Message");
        return message;
    }
    private async Task<DeviceMessage> InjectTestMode(DeviceMessage message)
    {
        if (_testModeEnabled && message != null)
        {
            message = await _testModeProvider.InjectTestModeAsync(message);
        }
        return message;
    }
    private async Task<DeviceMessage> PublishMessage(DeviceMessage message)
    {
        await _messagePublisher.Publish(message);
        return message;
    }
    private void OnComplete()
    {
        throw new NotImplementedException();
    }
    private void OnError(Exception ex)
    {
        throw new NotImplementedException();
    }
    private void OnMessageReceive(object o)
    {
        throw new NotImplementedException();
    }

[最终更新]

这就是我们最终的结局;一个IOobservable>

var listeners = Observable.Defer(() => _deviceTypeProvider.GetDeviceTypes()
                .ToObservable()
                .Select(UdpListener)
                .SelectMany(listener =>
                {
                    return Observable.Defer(() => Observable
                        .FromAsync(listener.UdpClient.ReceiveAsync)
                        .Where(x => x.Buffer.Length > 0)
                        .Repeat()
                        .Select(result => CreateMessage(listener.DeviceType, result))
                        .SelectMany(InjectTestMode)
                        .OfType<DeviceMessage>()
                        .Do(async message => await PublishMessage(message)))
                        .Retry();
                })).Retry();
            _listener = listeners.Subscribe(OnMessageReceive, OnError, OnComplete);

所以我可以发布一些代码并具有建设性,我认为Start方法中的查询应该是这样的:

_listener = _deviceTypeProvider.GetDeviceTypes().ToObservable()
        .SelectMany(CreateUdpListener, CreateMessage)
        .Retry()
        .Subscribe(OnMessageReceive, OnError, OnComplete);

现在,可怜的家伙来支持这个代码有机会理解它。:-)

我认为您忽略了RX序列在异常时终止的事实。若要在发生异常时继续序列,则需要使用Observable.Catch

看看这个http://www.introtorx.com/content/v1.0.10621.0/11_AdvancedErrorHandling.html

相关内容

  • 没有找到相关文章

最新更新