我有一些代码,它滚动通过端口列表,并设置一个包含可订阅的UDP缓冲区内容的流。
我是RX的新手,这是在回应另一个stackoverflow问题的帮助下构建的。
这个代码在早期测试中似乎还可以,并处理了我们可以发送的所有数据包,但现在它处于浸泡测试中,订阅似乎在一段时间后失败,它停止向事件中心发送事件。
我特别担心订阅代码中有一个异常,导致我丢失了信息,但没有被捕获。
这是代码,任何关于简化或改进的意见或建议都将不胜感激:
var receiveStream =
_deviceTypeProvider.GetDeviceTypes().ToObservable().Trace("GetDeviceTypes")
.SelectMany(devicePortMapping => Observable
.Using(() => UdpListener(devicePortMapping),
client =>
Observable
.FromAsync(client.ReceiveAsync)
.Repeat()).Trace("UdpListener"),
(devicePortMapping, stream) =>
{
Log
.ForContext("Raw", stream.Buffer.ToPrintByteArray())
.Verbose("Received Incoming {DeviceType} Message from {Device} on Port {Port}",
devicePortMapping.DeviceType, stream.RemoteEndPoint.Address, devicePortMapping.Port);
try
{
var timeZoneOffset =
_deviceTimeZoneOffsetProvider.GetDeviceTimeZoneOffset(devicePortMapping.Port);
var tenant = _deviceTenantProvider.GetDeviceTenant(devicePortMapping.Port);
if (tenant == null || timeZoneOffset == null)
{
Log
.Error(
"Tenant or TimeOffset Missing for Port: {Port}, cannot continue processing this message",
devicePortMapping.Port);
return null;
}
var message =
new DeviceMessage(new Device(stream.RemoteEndPoint.Address.ToIPAddressNum(),
stream.RemoteEndPoint.Port, devicePortMapping.DeviceType, tenant.TenantId,
timeZoneOffset.Offset))
{
MessageUid = Guid.NewGuid(),
Timestamp = DateTime.UtcNow,
Raw = stream.Buffer,
};
message.Information("Received Incoming Message");
return message;
}
catch (Exception ex)
{
Log.Error(ex, "Exception whilst receiving incoming message");
throw;
}
}).Trace("SelectMany").Select(Task.FromResult).Trace("Select");
if (_settings.TestModeEnabled)
{
Log
.Warning("Test Mode is Enabled");
receiveStream = receiveStream
.Select(async message =>
await _testModeProvider.InjectTestModeAsync(await message)).Trace("TestMode");
}
_listener = receiveStream.Subscribe(async messageTask =>
{
var message = await messageTask;
if (message == null)
{
Log
.Warning("Message is null, returning");
return;
}
Log
.ForContext("Raw", message.Raw.ToPrintByteArray(), true)
.ForContext("Device", message.Device, true)
.Verbose("Publishing Message {MessageUid} from {@Device}", message.MessageUid, message.Device);
await _messagePublisher.Publish(message).ConfigureAwait(false);
}, error => { Log.Error(error, "Exception whilst publishing message"); });
以下是InjectTestMode方法:
public async Task<DeviceMessage> InjectTestModeAsync(DeviceMessage deviceMessage)
{
try
{
var imei = GetImei(deviceMessage.Raw);
if (string.IsNullOrEmpty(imei))
{
Log
.ForContext("DeviceMessage",deviceMessage,true)
.Error("Error while getting IMEI value from message raw data in Test Mode");
return null;
}
//var dummyIpAddress = DummyIPfromIMEI(imei).ToIPAddressNum();
var mapping = await _mappingService.GetIPMappingAsync(deviceMessage.Device.IPAddress);
if (mapping == null)
{
Log
.ForContext("DeviceMessage", deviceMessage, true)
.Warning("Test Mode updated IP Address mapping with IPAddress: {IPAddress} for IMEI: {IMEI}", deviceMessage.Device.IPAddress.ToIPAddressString(), imei);
await _mappingService.UpdateIPMappingAsync(deviceMessage.Device.IPAddress, imei);
}
// deviceMessage.Device.IPAddress = dummyIpAddress;
return deviceMessage;
}
catch (Exception ex)
{
Log
.ForContext("DeviceMessage",deviceMessage,true)
.Error("Exception raised whilst injecting Test Mode", ex);
return null;
}
}
这是UdpListener方法:
private UdpClient UdpListener(DeviceTypeMap deviceTypeMap)
{
Log.Information("Listening for Device Type: {DeviceType} messages on Port: {Port}", deviceTypeMap.DeviceType,
deviceTypeMap.Port);
var udpClient = new UdpClient();
var serverEndpoint = new IPEndPoint(IPAddress.Any, deviceTypeMap.Port);
udpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
udpClient.Client.Bind(serverEndpoint);
return udpClient;
}
[更新]-2015年11月16日
所以我做了一些研究,似乎我有不止一个代码的味道,所以我更新了代码,它现在正在运行,但我想我会分享这个代码和修改后的代码的意图,看看是否有人能提出一个更优雅的解决方案。
意图
监听几个UDP端口,并将流量连同一些基于接收端口的元数据一起发送到Azure事件中心。
此元将取决于系统是否处于"测试模式"。
的实施
对于每个端口和设备类型,从ReceiveAsync事件创建一个UDPListener和Observable Collection,将这些Observable Collections组合成一个集合,然后可以从将数据发布到EventHubs的组件订阅该集合。
问题
InjectMode是异步的,如果出现问题,它可能会返回null,这似乎会扼杀序列。我认为这可能应该是某种可观察的扩展方法,它允许从序列中修改或删除设备消息,但我无法理解这一点。
最初,EventHub的发布是在订阅中进行的,直到我读到订阅中不应该使用异步,因为它会生成一个坏的async void
。所有的研究似乎都指向将其推入SelectMany,这没有任何意义,因为这是观察到的序列的目的地,而不是过程的一部分,但我同意了。订阅实际上变得多余了。
我不确定是否需要所有的试接球盖帽,但我确信我有一个问题,打乱了顺序。正如Enigmativity所指出的,我捕获了所有异常,然后记录并重新抛出,这些日志条目中从未出现任何内容。
Retry().Do()
感觉不太好,我可以像许多其他帖子中建议的那样让SelectMany()
工作,所以我别无选择。
这是现在正在运行的代码:
public void Start()
{
Log.Information("InboundUdpListener is starting");
var receiveStream =
_deviceTypeProvider.GetDeviceTypes().ToObservable().Trace("GetDeviceTypes")
.SelectMany(devicePortMapping => Observable
.Using(() => UdpListener(devicePortMapping),
client =>
Observable
.FromAsync(client.ReceiveAsync)
.Repeat()).Trace("UdpListener"),
(devicePortMapping, stream) =>
{
Log
.ForContext("Raw", stream.Buffer.ToPrintByteArray())
.Verbose("Received Incoming {DeviceType} Message from {Device} on Port {Port}",
devicePortMapping.DeviceType, stream.RemoteEndPoint.Address, devicePortMapping.Port);
try
{
var timeZoneOffset =
_deviceTimeZoneOffsetProvider.GetDeviceTimeZoneOffset(devicePortMapping.Port);
var tenant = _deviceTenantProvider.GetDeviceTenant(devicePortMapping.Port);
if (tenant == null || timeZoneOffset == null)
{
Log
.Error(
"Tenant or TimeOffset Missing for Port: {Port}, cannot continue processing this message",
devicePortMapping.Port);
return null;
}
var message =
new DeviceMessage(new Device(stream.RemoteEndPoint.Address.ToIPAddressNum(),
stream.RemoteEndPoint.Port, devicePortMapping.DeviceType, tenant.TenantId,
timeZoneOffset.Offset))
{
MessageUid = Guid.NewGuid(),
Timestamp = DateTime.UtcNow,
Raw = stream.Buffer,
};
message.Information("Received Incoming Message");
return message;
}
catch (Exception ex)
{
Log.Error(ex, "Exception whilst receiving incoming message");
throw;
}
}).Trace("SelectMany");
receiveStream = receiveStream.Retry().Do(async message =>
{
try
{
if (_testModeEnabled && message != null)
{
message = await _testModeProvider.InjectTestModeAsync(message);
}
if (message != null)
{
await _messagePublisher.Publish(message);
}
}
catch (Exception ex)
{
Log.Error(ex, "Exception whilst publishing incoming message");
throw;
}
}).Trace("Publish");
_listener = receiveStream.Retry().Subscribe(OnMessageReceive, OnError, OnComplete);
Log.Information("InboundUdpListener is started");
}
有人能看到这个代码的任何问题或提出任何改进建议吗。我真的很感谢你的帮助。
[李评论后更新]
我完全同意这是一场混乱,为了表明我愿意接受人们的帮助,这是我的下一次尝试
public void Start()
{
_listener = _deviceTypeProvider.GetDeviceTypes().ToObservable()
.SelectMany(CreateUdpListener, CreateMessage)
.SelectMany(InjectTestMode)
.SelectMany(PublishMessage)
.Retry()
.Subscribe(OnMessageReceive, OnError, OnComplete);
}
private IObservable<UdpReceiveResult> CreateUdpListener(DeviceTypeMap deviceType)
{
return Observable.Using(() => UdpListener(deviceType),
client => Observable.FromAsync(client.ReceiveAsync).Repeat());
}
private DeviceMessage CreateMessage(DeviceTypeMap deviceTypeMap, UdpReceiveResult receiveResult)
{
var timeZoneOffset =
_deviceTimeZoneOffsetProvider.GetDeviceTimeZoneOffset(deviceTypeMap.Port);
var tenant = _deviceTenantProvider.GetDeviceTenant(deviceTypeMap.Port);
if (tenant == null || timeZoneOffset == null)
{
Log
.Error(
"Tenant or TimeOffset Missing for Port: {Port}, cannot continue processing this message",
deviceTypeMap.Port);
return null;
}
var message =
new DeviceMessage(new Device(receiveResult.RemoteEndPoint.Address.ToIPAddressNum(),
receiveResult.RemoteEndPoint.Port, deviceTypeMap.DeviceType, tenant.TenantId,
timeZoneOffset.Offset))
{
MessageUid = Guid.NewGuid(),
Timestamp = DateTime.UtcNow,
Raw = receiveResult.Buffer,
};
message.Information("Received Incoming Message");
return message;
}
private async Task<DeviceMessage> InjectTestMode(DeviceMessage message)
{
if (_testModeEnabled && message != null)
{
message = await _testModeProvider.InjectTestModeAsync(message);
}
return message;
}
private async Task<DeviceMessage> PublishMessage(DeviceMessage message)
{
await _messagePublisher.Publish(message);
return message;
}
private void OnComplete()
{
throw new NotImplementedException();
}
private void OnError(Exception ex)
{
throw new NotImplementedException();
}
private void OnMessageReceive(object o)
{
throw new NotImplementedException();
}
[最终更新]
这就是我们最终的结局;一个IOobservable>
var listeners = Observable.Defer(() => _deviceTypeProvider.GetDeviceTypes()
.ToObservable()
.Select(UdpListener)
.SelectMany(listener =>
{
return Observable.Defer(() => Observable
.FromAsync(listener.UdpClient.ReceiveAsync)
.Where(x => x.Buffer.Length > 0)
.Repeat()
.Select(result => CreateMessage(listener.DeviceType, result))
.SelectMany(InjectTestMode)
.OfType<DeviceMessage>()
.Do(async message => await PublishMessage(message)))
.Retry();
})).Retry();
_listener = listeners.Subscribe(OnMessageReceive, OnError, OnComplete);
所以我可以发布一些代码并具有建设性,我认为Start
方法中的查询应该是这样的:
_listener = _deviceTypeProvider.GetDeviceTypes().ToObservable()
.SelectMany(CreateUdpListener, CreateMessage)
.Retry()
.Subscribe(OnMessageReceive, OnError, OnComplete);
现在,可怜的家伙来支持这个代码有机会理解它。:-)
我认为您忽略了RX序列在异常时终止的事实。若要在发生异常时继续序列,则需要使用Observable.Catch
。
看看这个http://www.introtorx.com/content/v1.0.10621.0/11_AdvancedErrorHandling.html