我们有一个方法Execute
,它像这样并行调用
ListOfLists.Select(ids => Observable.FromAsync(() => Execute(request, ids))).Merge(10);
我们有
protected Task<string> Execute(HttpRequestType request, IEnumerable<TItem> ids)
{
return SomeFunction(() => CreateRequest(request, ids.ToList()));
}
CreateRequest(request, ids.ToList((( 返回一个HttpRequestMessage
,SomeFunction
Func<HttpRequestMessage>
。
和
private async Task<T> SomeFunction(Func<HttpRequestMessage> func)
{
var request = func();
var retryCount = 0;
T result = null;
for (; retryCount < MaxRetries; retryCount++)
{
try
{
result = DoSomethingWithRequest(request);
if(result != null) break;
}
catch
{
//log here
}
finally
{
request = func();
}
}
return result;
}
现在我们已经看到,当我们ListOfLists
中有超过 10 个列表(因此任何时候只有 10 个列表执行,其余列表等待(并且DoSomethingWithRequest
间歇性地失败几次时,对SomeFunction
的一些调用是重复的,一些 id 列表被丢弃。上面的代码中是否有导致这种情况的原因?
请原谅标题不那么描述。
谢谢 希德。
编辑:
private HttpRequestMessage CreateRequest(HttpRequestType request, List<string> ids)
{
if (request == null) return null;
request.SomeProperty = toList;
return ConvertoToHttpRequestMessage(request); //This just does some serialization and adds a fresh request Id and headers
}
看起来您传递给Execute
函数的request
实例存在争用条件。
// The request instance must be created before this line, and you're passing the same
// instance to each call of Execute.
ListOfLists.Select(ids => Observable.FromAsync(() => Execute(request, ids))).Merge(10);
CreateRequest
不是在创建一个新的HttpRequestType
实例,它只是修改传递给Execute
的实例。由于每个线程Execute
在同一个HttpRequestType
实例上运行,因此它们只是相互覆盖。
因此,可能会发生类似以下情况:
Thread A
启动,ids
参数等于[1, 2, 3]
。Thread A
输入SomeFunction
并呼叫func
。为Thread A
捕获的toList
参数是[1, 2, 3]
,因此request.SomeProperty
设置为[1, 2, 3]
然后在某处的标头中创建带有[1, 2, 3]
的HttpRequest
并返回给SomeFunction
。DoSomethingWithRequestFails
Thread A
.
与此同时,Thread B
已经开始。ids
参数等于[4, 5, 6]
。Thread B
输入SomeFunction
并呼叫func
。Thread B
request.SomeProperty
设置为[4, 5, 6]
然后调用ConvertToHttpRequestMessage
。
现在,在Thread B
有机会创建HttpRequest
之前,Thread A
(失败(以SomeFunction
进入finally
块并再次调用func
。Thread A
集合request.SomeProperty
回到[1, 2, 3]
,并且由于Thread A
和Thread B
都在变异同一个HttpRequestType
实例,因此Thread B
现在也[1, 2, 3]
request.SomeProperty
。
Thread A
和Thread B
都会创建一个标头中包含[1, 2, 3]
的HttpRequest
。[1, 2, 3]
ID 列表是重复的,并且永远不会发送列表[4, 5, 6]
。
尝试将toList
从CreateRequest
传递到ConvertToHttpRequestMessage
,而不仅仅是HttpRequestType
,或者为每次调用Execute
创建一个新的HttpRequestType
实例。
我没有看到你从哪里开始并行执行。
如果您使用System.Parallel
则可以发送ParallelOptions
。下面是一个Parallel.ForEach()
调用示例
List<string> myList = new List<string>();
Parallel.ForEach(
myList,
new ParallelOptions()
{
MaxDegreeOfParallelism = 1337 // Here we allow 1337 parallel executions
},
(i) => { /* do something */ });