我面临Rx.Window 问题
下面是我的代码。请注意,有些工作项没有到达内部订阅。
var subscription
= m_BufferBlock
.AsObservable()
.Synchronize()
.Where(InValue => InValue.Region == Region)
.Do(W => logger.Debug("Side Effect => " + W.ToString()))
在此之后工作项丢失;没有撞到窗户,我看不出来在订阅内收到
.Window(TimeSpan.FromMilliseconds(10000))
.SubscribeOn(Scheduler.Default)
.Subscribe(window =>
{
window
.ToList()
.SubscribeOn(Scheduler.Default)
.Subscribe(workItems =>
{
foreach (WorkItem W in workItems)
{
// Some work items do not reach this line
logger.Debug("Came inside subscriber => " + W);
}
if (workItems.Count > 0)
{
ProcessWorkItems(workItems.ToList<WorkItem>());
}
});
});
工作项类
public class WorkItem
{
public int Region { get; set; }
public int Priority { get; set; }
public string PortfolioId { get; set; }
public string SecurityId { get; set; }
public string Status { get; set; }
public string Message { get; set; }
public Int64 Guid { get; set; }
public DateTime RequestedDateTime { get; set; }
public WorkItemType WorkItemType { get; set; }
public long RecordCount { get; set; }
public WorkItem()
{
PortfolioId = string.Empty;
SecurityId = string.Empty;
Message = string.Empty;
Status = string.Empty;
Region = 0;
WorkItemType = WorkItemType.REALTIME;
RequestedDateTime = DateTime.Now;
RecordCount = 0;
}
public override string ToString()
{
return string.Format("WorkItemType : {0} Region : {1} PortfolioId : {2} SecurityId : {3} Guid : {4} Priority : {5} Status : {6} Message : {7}",
WorkItemType.ToString(), Region, PortfolioId, SecurityId, Guid, Priority, Status, Message);
}
}
我正在使用容量无限的BufferBlock。。。
任何帮助都将不胜感激。。。
不要执行window.ToList().SubscribeOn(...).Subscribe(...)
。相反,请执行window.ToList().Subscribe(...)
或使用Chris使用SelectMany的版本。
内窗可观测值热。这意味着您必须在返回之前订阅它们,否则您将丢失窗口中的项目。但是SubscribeOn在订阅中引入了延迟,而订阅调用是在另一个线程或任务上调度的。这意味着你在实际订阅窗口之前就从回调中返回了,这是一场比赛,看订阅是否在项目到达窗口之前运行。如果某个项目在预定的订阅调用运行之前到达,则该项目将丢失,因为订阅尚未设置。
因此,正如James所指出的,SubscribeOn不仅没有用处,而且实际上是问题的原因。
您是否完全确定正在丢失元素?(如果你是,我几乎可以肯定这不是Window
中的错误)
我问的原因是,您的Rx查询是以这样一种方式编写的,即在记录窗口n中捕获的项目之前,完全有可能写出在窗口n+1的Do()
中发送的日志输出。
我想知道再往下读一点日志是否能解决问题?
为了检查这一点,我们可以稍微修改您的查询以获得窗口号,并将其包含在您的日志输出中,如下所示:
var subscription
= m_BufferBlock
.AsObservable()
.Synchronize()
.Where(InValue => InValue.Region == Region)
.Do(W => logger.Debug("Side Effect => " + W.ToString()))
.Window(TimeSpan.FromMilliseconds(10000))
.Select((window, index) => Tuple.Create(window,index))
.SubscribeOn(Scheduler.Default)
.Subscribe(window =>
{
window.Item1
.ToList()
.SubscribeOn(Scheduler.Default)
.Subscribe(workItems =>
{
foreach (WorkItem W in workItems)
{
// Some work items do not reach this line
logger.Debug("Came inside window " + window.Item2 + " and subscriber => " + W);
}
if (workItems.Count > 0)
{
ProcessWorkItems(workItems.ToList<WorkItem>());
}
});
});
我应该补充一点,以您现有的方式嵌套订阅不是最佳实践。它带来了线性(你在这里受到的影响)、性能、灵活性和可读性方面的问题。你最好把事情"放在monad中"(即组成可观察性),并尽可能长时间地等待最后的订阅。
看看Chris是如何用SelectMany
代替嵌套订阅来翻译您的查询的,看看如何实现这一点。此外,你的SubscribeOn
可能在这里一无所获——看看这个问题,看看SubscribeOn
给了你什么。
Window
将为每个窗口周期生成一个Observable,无论该窗口期间是否生成任何项。因此,您最终可能会看到不包含任何事件的窗口。当你在这些窗口上使用ToList时,它们会产生一个可观察的结果,它会生成一个空列表。
var subscription = m_BufferBlock
.AsObservable()
.Synchronize()
.Where(item => item.Region == Region)
.Do(item => logger.Debug("Side Effect => " + item))
.Window(TimeSpan.FromMilliseconds(10000))
.SelectMany(window => window.ToList())
.Where(workItems => workItems.Count > 0)
.Subscribe(ProcessWorkItems);
这也是可行的。
var subscription = m_BufferBlock
.AsObservable()
.Synchronize()
.Where(item => item.Region == Region)
.Do(item => logger.Debug("Side Effect => " + item))
.Buffer(TimeSpan.FromMilliseconds(10000))
.Where(workItems => workItems.Count > 0)
.Subscribe(ProcessWorkItems);