关于多线程,我遇到了一个难题。我目前正在使用SinglaR进行实时服务。这个想法是连接的用户可以向另一个用户请求数据。以下是请求和响应函数的概要。
考虑以下代码:
private readonly ConcurrentBag _sharedObejcts= new ConcurrentBag();
请求:
[...]
var sharedObject = new MyObject();
_sharedObejcts.Add(sharedObject);
ForwardRequestFireAndForget();
try
{
await Task.Delay(30000, sharedObject.myCancellationToken);
}
catch
{
return sharedObject.ResponseProperty;
}
_myConcurrentBag.TryTake(sharedObject);
[...]
回应:
[...]
var result = DoSomePossiblyVeryLengthyTaskHere();
var sharedObject = ConcurrentBag
.Where(x)
.FirstOrDefault();
// The request has timed out so the object isn't there anymore.
if(sharedObject == null)
{
return someResponse;
}
sharedObject.ResponseProperty = result;
// triggers the cancellation source
sharedObject.Cancel();
return someOtherResponse;
[...]
因此,基本上会向服务器发出请求,然后转发到另一台主机,该功能会等待取消或超时。
其他主机调用response函数,该函数添加repsonseObject
并触发myCancellationToken
。
然而,我不确定这是否代表了一种比赛状态。理论上,当另一个线程仍然位于finally块上时,响应线程是否可以检索sharedObject
?这意味着,请求已经超时,任务只是没有时间从包中取出物体,这意味着数据不一致。
有什么有保证的方法可以确保Task.Delay()
调用后第一个被调用的是TryTake()
调用?
您不希望生产者取消消费者的等待。责任太多了。
相反,您真正想要的是生产者发送一个异步信号。这是通过TaskCompletionSource<T>
完成的。使用者可以添加具有不完整TCS的对象,然后使用者可以(异步(等待TCS完成(或超时(。然后生产商将其价值赋予TCS。
类似这样的东西:
class MyObject
{
public TaskCompletionSource<MyProperty> ResponseProperty { get; } = new TaskCompletionSource<MyProperty>();
}
// request (consumer):
var sharedObject = new MyObject();
_sharedObejcts.Add(sharedObject);
ForwardRequestFireAndForget();
var responseTask = sharedObject.ResponseProperty.Task;
if (await Task.WhenAny(Task.Delay(30000), responseTask) != responseTask)
return null;
_myConcurrentBag.TryTake(sharedObject);
return await responseTask;
// response (producer):
var result = DoSomePossiblyVeryLengthyTaskHere();
var sharedObject = ConcurrentBag
.Where(x)
.FirstOrDefault();
// The request has timed out so the object isn't there anymore.
if(sharedObject == null)
return someResponse;
sharedObject.ResponseProperty.TrySetResult(result);
return someOtherResponse;
上面的代码可以稍微清理一下;具体来说,让制片人有一个";生产者观点";共享对象的",并且消费者具有";消费者观点";,其中两个接口由相同类型实现。但是上面的代码应该会给你一个大致的想法。