使用Grand Central Dispatch的ConcurrentPerformance()时,应该如何聚合数据?
我正在做下面代码中的操作,但当notify()块结束时,resultDictionary似乎丢失了所有数据。因此,我得到的只是函数返回的一个空字典。
我不知道为什么会发生这种情况,因为当我打印或设置断点时,我可以在块结束之前看到resultDictionary中有一些内容。
let getCVPDispatchQueue = DispatchQueue(label: "blarg",
qos: .userInitiated,
attributes: .concurrent)
let getCVPDispatchGroup = DispatchGroup()
var resultDictionary = dataIDToSRLParticleDictionary()
getCVPDispatchQueue.async { [weak self] in
guard let self = self else { return }
DispatchQueue.concurrentPerform(iterations: self.dataArray.count) { [weak self] (index) in
guard let self = self else { return }
let data = self.dataArray[index]
getCVPDispatchGroup.enter()
let theResult = data.runPartcleFilterForClosestParticleAndMaybeStopAudio()
switch theResult {
case .success(let CVParticle):
// If there was a CVP found, add it to the set.
if let theCVParticle = CVParticle {
self.dataIDsToCVPDictionary.addTodataIDToCVPDict(key: data.ID,
value: theCVParticle)
}
case .failure(let error):
os_log(.error, log: self.logger, "rundatasProcessing error: %s", error.localizedDescription)
self._isActive = false
}
getCVPDispatchGroup.leave()
}
getCVPDispatchGroup.notify(queue: .main) { [weak self] in
guard let self = self else { return }
print("DONE with (self.dataIDsToCVPDictionary.getDictionary.count)")
resultDictionary = self.dataIDsToCVPDictionary.getDictionary
print("resultDictionary has (self.dataIDsToCVPDictionary.getDictionary.count)")
}
}
print("Before Return with (resultDictionary.count)")
return resultDictionary
}
不确定这是否会有所帮助,但这是我为确保访问字典线程的安全而创建的简单类。
class DATASynchronizedIDToParticleDictionary {
var unsafeDictionary: DATAIDToDATAParticleDictionary = DATAIDToDATAParticleDictionary()
let accessQueue = DispatchQueue(label: "blarg2",
qos: .userInitiated,
attributes: .concurrent)
var getDictionary: DATAIDToDATAParticleDictionary {
get {
var dictionaryCopy: DATAIDToDATAParticleDictionary!
accessQueue.sync {
dictionaryCopy = unsafeDictionary
}
return dictionaryCopy
}
}
func addToDATAIDToCVPDict(key: String, value: DATAParticle) {
accessQueue.async(flags: .barrier) { [weak self] in
guard let self = self else { return }
self.unsafeDictionary[key] = value
}
}
func clearDictionary() {
accessQueue.async(flags: .barrier) { [weak self] in
guard let self = self else { return }
self.unsafeDictionary.removeAll()
}
}
}
你说:
我正在执行下面代码中的操作,但当
notify()
块结束时,resultDictionary
似乎丢失了所有数据。因此,我得到的只是函数返回的一个空字典。
问题是您试图return
一个异步计算的值。您可能希望转换到完成块模式。
顺便说一句,调度组是不必要的。具有讽刺意味的是,concurrentPerform
是同步的(即,它直到并行化的for
循环结束才继续)。因此,如果你知道在所有迭代完成之前,你不会到达concurrentPerform
之后的行,那么使用notify
是没有意义的。
我还建议不要让concurrentPerform
循环更新属性。它会让你面临各种各样的问题。例如,如果主线程同时与该对象交互,该怎么办?当然,您可以同步您的访问,但它可能不完整。让它只更新局部变量,并让调用方在其完成处理程序块中进行属性更新,这可能更安全。显然,您可以继续更新属性(特别是如果您想更新UI以反映运行中的进度),但这会给代码增加一个可能不必要的额外褶皱。下面,我认为没有必要。
此外,虽然我理解所有这些[weak self]
引用背后的意图,但它们确实不需要,尤其是在同步类DATASynchronizedIDToParticleDictionary
中。我们经常使用weak
引用来避免强引用循环。但如果你没有强有力的推荐信,它们只会增加开销,除非你有其他迫切的需求。
好的,让我们深入研究一下代码。
-
首先,我将使用通用泛型来退役专用
DATASynchronizedIDToParticleDictionary
class SynchronizedDictionary<Key: Hashable, Value> { private var _dictionary: [Key: Value] private let queue = DispatchQueue(label: Bundle.main.bundleIdentifier! + ".dictionary", qos: .userInitiated, attributes: .concurrent) init(_ dictionary: [Key: Value] = [:]) { _dictionary = dictionary } var dictionary: [Key: Value] { queue.sync { _dictionary } } subscript(key: Key) -> Value? { get { queue.sync { _dictionary[key] } } set { queue.async(flags: .barrier) { self._dictionary[key] = newValue } } } func removeAll() { queue.async(flags: .barrier) { self._dictionary.removeAll() } } }
注意,我已经删除了不必要的
weak
引用。我还用一个更自然的下标运算符和一个更紧密地反映底层Dictionary
类型接口的removeAll
方法重命名了addToDATAIDToCVPDict
和clearDictionary
。它产生了看起来更自然的代码。(因为这是通用的,我们可以将它用于任何需要这种低级别同步的字典。)无论如何,您现在可以声明字典的同步格式,如下所示:
let particles = SynchronizedDictionary(dataIDToSRLParticleDictionary())
当我想用一些值更新字典时,你可以这样做:
particles[data.ID] = theCVParticle
当我想检索实际的底层包装字典时,我可以这样做:
let finalResult = particles.dictionary
-
当我们这样做的时候,由于我们可能想跟踪需要同步的错误数组,我可能会添加一个数组等效类型:
class SynchronizedArray<Value> { private var _array: [Value] private let queue = DispatchQueue(label: Bundle.main.bundleIdentifier! + ".array", qos: .userInitiated, attributes: .concurrent) init(_ dictionary: [Value] = []) { _array = dictionary } var array: [Value] { queue.sync { _array } } subscript(index: Int) -> Value { get { queue.sync { _array[index] } } set { queue.async(flags: .barrier) { self._array[index] = newValue } } } func append(_ value: Value) { queue.async(flags: .barrier) { self._array.append(value) } } func removeAll() { queue.async(flags: .barrier) { self._array.removeAll() } } }
-
我们现在可以把注意力转向主要的程序。因此,与其返回一个值,不如给它一个
@escaping
完成处理程序。而且,如上所述,我们将取消不必要的调度组:func calculateAllClosestParticles(completion: @escaping ([String: CVParticle], [Error]) -> Void) { let queue = DispatchQueue(label: "blarg", qos: .userInitiated, attributes: .concurrent) let particles = SynchronizedDictionary(dataIDToSRLParticleDictionary()) let errors = SynchronizedArray<Error>() queue.async { DispatchQueue.concurrentPerform(iterations: self.dataArray.count) { index in let data = self.dataArray[index] let result = data.runPartcleFilterForClosestParticleAndMaybeStopAudio() switch result { case .success(let cvParticle): // If there was a CVP found, add it to the set. if let cvParticle = cvParticle { particles[data.ID] = cvParticle } case .failure(let error): errors.append(error) } } DispatchQueue.main.async { completion(particles.dictionary, errors.array) } } }
现在,我不知道字典的正确类型是什么,所以您可能需要调整
completion
的参数。你没有提供其余的程序,所以我可能在这里有一些细节错误。但不要迷失在细节中,只需注意小心避免concurrentPerform
中的属性,并将结果传递回完成处理程序。你可以这样称呼它:
calculateAllClosestParticles { dictionary, errors in guard errors.isEmpty else { return } // you can access the dictionary and updating the model and UI here self.someProperty = dictionary self.tableView.reloadData() } // but don't try to access the dictionary here, because the asynchronous code hasn't finished yet //
-
FWIW,虽然我使用了您在示例中使用的读写器模式,但根据我的经验,
NSLock
实际上更适合快速同步,尤其是当您使用concurrentPerform
时,它可能会占用CPU上的所有核心,例如class SynchronizedDictionary<Key: Hashable, Value> { private var _dictionary: [Key: Value] private let lock = NSLock() init(_ dictionary: [Key: Value] = [:]) { _dictionary = dictionary } var dictionary: [Key: Value] { lock.synchronized { _dictionary } } subscript(key: Key) -> Value? { get { lock.synchronized { _dictionary[key] } } set { lock.synchronized { _dictionary[key] = newValue } } } func removeAll() { lock.synchronized { _dictionary.removeAll() } } }
何处
extension NSLocking { func synchronized<T>(_ closure: () throws -> T) rethrows -> T { lock() defer { unlock() } return try closure() } }
最重要的是,如果不需要的话,你不想强制上下文切换进行同步
-
在执行并发执行时,如果有许多
dataPoints
,并且每次调用runPartcleFilterForClosestParticleAndMaybeStopAudio
所需的时间适中,则可能需要考虑"跨步",在每次迭代中执行多个数据点。这超出了这个问题的范围,但仅供参考。
不太确定我做了什么,但我移动了
resultDictionary = self.dataIDsToCVPDictionary.getDictionary
在第一个异步块之外,并且似乎允许为函数返回保留/保留数据。