强制异步代码同步运行



我目前有一个类,通过GRPC异步获取图像后存储图像。在处理事件上有个问题。问题可以显示在这里:

import 'dart:async';
main() {
IntStore intStore = IntStore();
print("running getInt(A)");
intStore.getInt("A");
print("running getInt(A)");
intStore.getInt("A");
print("running getInt(B)");
intStore.getInt("B");
print("running getInt(A)");
intStore.getInt("A");
print("running getInt(C)");
intStore.getInt("C");
print("running getInt(D)");
intStore.getInt("D");
}
class IntStore {
final Map _store = <String, int>{};
Future fetchInt(String intName) async {
print("Fetching: $intName");
await doSomeWorkAsynchronously(intName);
}
Future<int> getInt(String intName) async {
if (_store.containsKey(intName)) {
print("Cached: $intName");
return _store[intName];
} else {
await fetchInt(intName);
return _store[intName];
}
}
Future doSomeWorkAsynchronously(String intName) async {
await Future.delayed(const Duration(seconds: 3));
_store[intName] = 3;
print("Fetched: $intName");
}
}

返回:

running getInt(A)
Fetching: A
running getInt(A)
Fetching: A
running getInt(B)
Fetching: B
running getInt(A)
Fetching: A
running getInt(C)
Fetching: C
running getInt(D)
Fetching: D
Fetched: A
Fetched: A
Fetched: B
Fetched: A
Fetched: C
Fetched: D

这里的问题是fetchInt中的工作被多次完成。这是相当低效的。

我觉得Irn对这个问题的回答很有帮助。我实现这个是为了获得:

import 'dart:async';
main() {
IntStore intStore = IntStore();
AsyncMutex mutex = AsyncMutex();
print("running getInt(A)");
mutex.run(() => intStore.getInt("A"));
print("running getInt(A)");
mutex.run(() => intStore.getInt("A"));
print("running getInt(B)");
mutex.run(() => intStore.getInt("B"));
print("running getInt(A)");
mutex.run(() => intStore.getInt("A"));
print("running getInt(C)");
mutex.run(() => intStore.getInt("C"));
print("running getInt(D)");
mutex.run(() => intStore.getInt("D"));
}
class IntStore {
final Map _store = <String, int>{};
Future fetchInt(String intName) async {
print("Fetching: $intName");
await doSomeWorkAsynchronously(intName);
}
Future<int> getInt(String intName) async {
if (_store.containsKey(intName)) {
print("Cached: $intName");
return _store[intName];
} else {
await fetchInt(intName);
return _store[intName];
}
}
Future doSomeWorkAsynchronously(String intName) async {
await Future.delayed(const Duration(seconds: 3));
_store[intName] = 3;
print("Fetched: $intName");
}
}
class AsyncMutex {
Future _next = new Future.value(null);
/// Request [operation] to be run exclusively.
///
/// Waits for all previously requested operations to complete,
/// then runs the operation and completes the returned future with the
/// result.
Future<T> run<T>(Future<T> operation()) {
var completer = new Completer<T>();
_next.whenComplete(() {
completer.complete(new Future<T>.sync(operation));
});
return _next = completer.future;
}
}

返回

running getInt(A)
running getInt(A)
running getInt(B)
running getInt(A)
running getInt(C)
running getInt(D)
Fetching: A
Fetched: A
Cached: A
Fetching: B
Fetched: B
Cached: A
Fetching: C
Fetched: C
Fetching: D
Fetched: D

这里的问题是每个fetchInt调用,虽然可能是必要的,但会互相阻塞。这比以前效率更低。我修改了实现以提高效率:

import 'dart:async';
main() {
IntStore intStore = IntStore();
print("running getInt(A)");
intStore.getInt("A");
print("running getInt(A)");
intStore.getInt("A");
print("running getInt(B)");
intStore.getInt("B");
print("running getInt(A)");
intStore.getInt("A");
print("running getInt(C)");
intStore.getInt("C");
print("running getInt(D)");
intStore.getInt("D");
}
class IntStore {
final Map _store = <String, int>{};
final Map _mutexStore = <String, AsyncMutex>{};
Future<void> fetchInt(String intName) async {
if (!_store.containsKey(intName)) {
print("Fetching: $intName");
await doSomeWorkAsynchronously(intName);
}
}
Future<int> getInt(String intName) async {
if (_mutexStore.containsKey(intName)) {
print("Mutex already here: $intName");
await _mutexStore[intName].run<void>(() => fetchInt(intName));
} else {
print("Creating Mutex: $intName");
_mutexStore[intName] = AsyncMutex();
await _mutexStore[intName].run<void>(() => fetchInt(intName));
}
print("Passing: $intName");
return _store[intName];
}
Future doSomeWorkAsynchronously(String intName) async {
await Future.delayed(const Duration(seconds: 3));
_store[intName] = 3;
print("Fetched: $intName");
}
}
class AsyncMutex {
Future _next = new Future.value(null);
/// Request [operation] to be run exclusively.
///
/// Waits for all previously requested operations to complete,
/// then runs the operation and completes the returned future with the
/// result.
Future<T> run<T>(Future<T> operation()) {
var completer = new Completer<T>();
_next.whenComplete(() {
completer.complete(new Future<T>.sync(operation));
});
return _next = completer.future;
}
}

返回

running getInt(A)
Creating Mutex: A
running getInt(A)
Mutex already here: A
running getInt(B)
Creating Mutex: B
running getInt(A)
Mutex already here: A
running getInt(C)
Creating Mutex: C
running getInt(D)
Creating Mutex: D
Fetching: A
Fetching: B
Fetching: C
Fetching: D
Fetched: A
因此,

And将尽可能快地返回每个int,保持不同调用之间的并发性,但阻止对同一int的重复调用。

最新更新