当项目数量未知时定义下一个起点



我有一个需要查询的web服务,它的值支持对其数据进行分页。由于我需要获取的数据量以及该服务的实现方式,我打算进行一系列并发的http web请求来积累这些数据。

假设我有线程的数量和页面大小,我如何分配每个线程来选择其不与其他线程重叠的起点?我已经很久没有学习并行编程了,我有点挣扎。我知道我可以用start = N/numThreads * threadNum这样的东西找到我的起点,但我不知道N。现在我只是旋转X个线程和每个循环,直到它们没有更多的数据。问题是它们往往会重叠,我最终会得到重复的数据。我需要唯一的数据,不要浪费请求。

现在我有这样的代码。这是许多尝试之一,我明白为什么这是错误的,但最好展示一些东西。目标是并行地从Web服务收集数据页面:

       int limit = pageSize;
        data = new List<RequestStuff>();
        List<Task> tasks = new List<Task>();
        for (int i = 0; i < numThreads; i++)
        {
            tasks.Add(Task.Factory.StartNew(() =>
                {
                    try
                    {
                        List<RequestStuff> someData;                                
                        do
                        {
                            int start;
                            lock(myLock)
                            {
                               start = data.Count;
                            }
                            someKeys = GetDataFromService(start, limit);
                            lock (myLock)
                            {
                                if (someData != null && someData.Count > 0)
                                {
                                    data.AddRange(someData);
                                }
                            }
                        } while (hasData);
                    }
                    catch (AggregateException ex)
                    {
                       //Exception things
                    }
                }));
        }
        Task.WaitAll(tasks.ToArray());

有什么灵感可以在没有种族条件的情况下解决这个问题吗?如果这很重要的话,我需要坚持使用.NET4。

我不确定是否有一种方法可以做到这一点而不浪费一些请求,除非你知道实际的限制。下面的代码可能有助于消除重复的数据,因为你只会在每个索引上查询一次:

    private int _index = -1; // -1 so first request starts at 0
    private bool _shouldContinue = true;
    public IEnumerable<RequestStuff> GetAllData()
    {
        var tasks = new List<Task<RequestStuff>>();
        while (_shouldContinue)
        {
            tasks.Add(new Task<RequestStuff>(() => GetDataFromService(GetNextIndex())));
        }
        Task.WaitAll(tasks.ToArray());
        return tasks.Select(t => t.Result).ToList();
    }
    private RequestStuff GetDataFromService(int id)
    {
        // Get the data
        // If there's no data returned set _shouldContinue to false
        // return the RequestStuff;
    }
    private int GetNextIndex()
    {
        return Interlocked.Increment(ref _index);
    }

它还可以通过添加取消令牌来取消任何您认为浪费的索引来进行改进,即,如果索引4不返回任何内容,则可以取消对4以上仍处于活动状态的索引的所有查询。

或者,如果你能对最大索引做出合理的猜测,你就可以在检索任何数据之前实现一种算法来精确定位限制。不过,如果你的猜测相当准确,这可能会更有效率。

您是否试图通过发出多个并发请求来强制远程服务的并行性?分页通常用于将返回的数据量限制为仅需要的数据量,但如果您需要所有数据,那么尝试先分页,然后再重建它似乎是一种糟糕的设计。你的代码变得不必要地复杂,难以维护,你可能只是将瓶颈从你控制的代码转移到其他地方,现在你已经引入了数据完整性问题(如果所有这些线程都访问你试图查询的数据的不同版本,会发生什么?)。通过增加呼叫的复杂性和数量,您也增加了出现问题的可能性(例如,其中一个连接断开)。

你能说明你试图解决的问题吗?也许我们可以帮助设计一个更好的解决方案?

最新更新