我提出了这个解决方案。(尚未测试)通过网络上的大量反弹。
Private Function ObserveUDP() As IObservable(Of bytes())
Dim f = Function(observer)
Dim endpoint = New IPEndPoint(IPAddress.Parse(Me.IpAdress), Me.IPPort)
Dim client = New UdpClient(endpoint)
Dim obs = observable.*emphasized text*Generate(Of Task(Of UdpReceiveResult), UdpReceiveResult) _
( Nothing _
, Function(task As Task(Of UdpReceiveResult)) task Is Nothing Or Not task.IsCompleted() _
, Function(task As Task(Of UdpReceiveResult)) client.ReceiveAsync() _
, Function(task As Task(Of UdpReceiveResult)) task.Result)
Dim observable = obs.Select(Function(r) r.Buffer)
dim handle = observable.Subscribe(observer)
Dim df = Sub()
client.Close()
handle.Dispose()
End Sub
Return Disposable.Create(df)
End Function
Return observable.Create(f)
End Function
我的要求是确保UDP客户端在删除订阅时关闭。我很确定上面的代码很接近,但我认为它不太正确。任何意见都将不胜感激。
*编辑*
实际上,上面的例子是完全错误的,只会同步创建大量的任务对象但不要等待他们。经过一番尝试和错误之后,我为展开一个被一次又一次召唤的无尽。有什么意见吗?
''' initializer - a function that initializes and returns the state object
''' generator - a function that asynchronously using await generates each value
''' finalizer - a function for cleaning up the state object when the sequence is unsubscribed
Private Function ObservableAsyncSeq(Of T, I)( _
initializer As Func(Of I), _
generator As Func(Of I, Task(Of T)), _
finalizer As Action(Of I)) As IObservable(Of T)
Dim q = Function(observer As IObserver(Of T))
Dim go = True
Try
Dim r = Async Sub()
Dim ii As I = initializer()
While go
Dim result = Await generator(ii)
observer.OnNext(result)
End While
finalizer(ii)
observer.OnCompleted()
End Sub
Task.Run(r)
Catch ex As Exception
observer.OnError(ex)
End Try
' Disposable for stopping the sequence as per
' the observable contract
Return Sub() go = False
End Function
Return Observable.Create(q)
End Function
以及UDP 的使用示例
Private Function ObserveMeasurementPoints2() As IObservable(Of ProcessedDate)
Dim initializer = Function()
Dim endpoint = New IPEndPoint(IPAddress.Parse(Me.IpAdress), Me.IPPort)
Return New UdpClient(endpoint)
End Function
Dim finalizer = Function(client As UdpClient)
client.Close()
End Function
Dim generator = Function(client As UdpClient) As Task(Of UdpReceiveResult)
Return client.ReceiveAsync()
End Function
Return ObservableAsyncSeq(initializer, generator, finalizer).Select(Function(r) ProcessBytes(r.Buffer))
End Function
您可以使用Observable.Using
作为所提到的Enigmativity,也可以简单地使用接受IDisposable
作为返回参数的常规Observable.Create
方法-这足以安全处理。
使用迭代器或异步是非常好的。我列出了一种更Rx-ish的方法:
Public Shared Function UdpStream(Of T)(endpoint As IPEndPoint, processor As Func(Of Byte(), T)) As IObservable(Of T)
Return Observable.Using(Of T, UdpClient)(
Function() New UdpClient(endpoint),
Function(udpClient) _
Observable.Defer(Function() udpClient.ReceiveAsync().ToObservable()) _
.Repeat() _
.Select(Function(result) processor(result.Buffer))
)
End Function
传统方式:
Public Shared Function UdpStream(Of T)(endpoint As IPEndPoint, processor As Func(Of Byte(), T)) As IObservable(Of T)
Return Observable.Using(
Function() New UdpClient(endpoint),
Function(udpClient) Observable.Defer( _
Observable.FromAsyncPattern(
AddressOf udpClient.BeginReceive,
Function(iar)
Dim remoteEp = TryCast(iar.AsyncState, IPEndPoint)
Return udpClient.EndReceive(iar, remoteEp)
End Function)
).Repeat() _
.Select(processor)
)
End Function
测试:
Shared Sub Main()
Using UdpStream(New IPEndPoint(IPAddress.Loopback, 13200),
Function(bytes) String.Join(",", bytes)
).Subscribe(AddressOf Console.WriteLine)
Console.ReadLine()
End Using
Console.WriteLine("Done")
Console.ReadKey()
End Sub
看看Observable.Using
-它专门用于创建一个可观察对象,该可观察对象使用一次性资源生成其值,完成后自动处理资源。
你会发现CCD_ 5具有相同的CCD_;Dispose
方法实现,因此如果调用Dispose
,则不需要调用Close
。
来自反射器:
void IDisposable.Dispose()
{
this.Dispose(true);
}
public void Close()
{
this.Dispose(true);
}
这是Using
:的签名
Public Shared Function Using(Of TSource, TResource As IDisposable)(
ByVal resourceFactory As Func(Of TResource),
ByVal observableFactory As Func(Of TResource, IObservable(Of TSource)))
As IObservable(Of TSource)
我以前没有使用过UDPClient,但您似乎正在使用Tasks(Cardinality=1)尝试接收数据流(Cardinalty=many)。这似乎解决了这个问题——你重复了你的询问。这意味着您的查询将执行此
- 创建UDPClient
- 调用数据请求
- 接收它获得的第一个数据
- 在序列上推送数据
- 关闭序列
- 处置UDPClient
- 创建UDPClient(返回步骤1)
- 调用数据请求
- 接收它获得的第一个数据
- 。。。。直到您断开连接
我认为您应该能够通过拉入字节流来读取套接字/网络连接。我在我的博客文章中向你展示了如何做到这一点:
http://introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#CreatingYourOwnIterator
这样,您只需打开一个连接,并在收到字节时推送字节。
快速搜索后,我还发现有人担心UDPClient实现的可靠性。http://www.codeproject.com/Articles/1938/Issues-with-UdpClient-Receive
HTH-
Lee
using System;
using System.IO;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
namespace MyLib
{
public static class ObservableExtensions
{
//TODO: Could potentially upgrade to using tasks/Await-LC
public static IObservable<byte> ToObservable(
this Stream source,
int buffersize,
IScheduler scheduler)
{
var bytes = Observable.Create<byte>(o =>
{
var initialState = new StreamReaderState(source, buffersize);
var currentStateSubscription = new SerialDisposable();
Action<StreamReaderState, Action<StreamReaderState>> iterator =
(state, self) =>
currentStateSubscription.Disposable = state.ReadNext()
.Subscribe(
bytesRead =>
{
for (int i = 0; i < bytesRead; i++)
{
o.OnNext(state.Buffer[i]);
}
if (bytesRead > 0)
self(state);
else
o.OnCompleted();
},
o.OnError);
var scheduledWork = scheduler.Schedule(initialState, iterator);
return new CompositeDisposable(currentStateSubscription, scheduledWork);
});
return Observable.Using(() => source, _ => bytes);
}
private sealed class StreamReaderState
{
private readonly int _bufferSize;
private readonly Func<byte[], int, int, IObservable<int>> _factory;
public StreamReaderState(Stream source, int bufferSize)
{
_bufferSize = bufferSize;
_factory = Observable.FromAsyncPattern<byte[], int, int, int>(
source.BeginRead,
source.EndRead);
Buffer = new byte[bufferSize];
}
public IObservable<int> ReadNext()
{
return _factory(Buffer, 0, _bufferSize);
}
public byte[] Buffer { get; set; }
}
}
}