如何使用GCD聚合DispatchQueue.concurrentPerform()中的数据



使用Grand Central Dispatch的ConcurrentPerformance()时,应该如何聚合数据?

我正在做下面代码中的操作,但当notify()块结束时,resultDictionary似乎丢失了所有数据。因此,我得到的只是函数返回的一个空字典。

我不知道为什么会发生这种情况,因为当我打印或设置断点时,我可以在块结束之前看到resultDictionary中有一些内容。

let getCVPDispatchQueue = DispatchQueue(label: "blarg",
qos: .userInitiated,
attributes: .concurrent)
let getCVPDispatchGroup = DispatchGroup()
var resultDictionary = dataIDToSRLParticleDictionary()
getCVPDispatchQueue.async { [weak self] in
guard let self = self else { return }
DispatchQueue.concurrentPerform(iterations: self.dataArray.count) { [weak self] (index) in
guard let self = self else { return }
let data = self.dataArray[index]
getCVPDispatchGroup.enter()
let theResult = data.runPartcleFilterForClosestParticleAndMaybeStopAudio()
switch theResult {
case .success(let CVParticle):
// If there was a CVP found, add it to the set.
if let theCVParticle = CVParticle {
self.dataIDsToCVPDictionary.addTodataIDToCVPDict(key: data.ID,
      value: theCVParticle)
}
case .failure(let error):
os_log(.error, log: self.logger, "rundatasProcessing error: %s", error.localizedDescription)
self._isActive = false
}
getCVPDispatchGroup.leave()
}
getCVPDispatchGroup.notify(queue: .main) { [weak self] in
guard let self = self else { return }
print("DONE with (self.dataIDsToCVPDictionary.getDictionary.count)")
resultDictionary = self.dataIDsToCVPDictionary.getDictionary
print("resultDictionary has (self.dataIDsToCVPDictionary.getDictionary.count)")
}
}
print("Before Return  with (resultDictionary.count)")
return resultDictionary
}

不确定这是否会有所帮助,但这是我为确保访问字典线程的安全而创建的简单类。

class DATASynchronizedIDToParticleDictionary {
var unsafeDictionary: DATAIDToDATAParticleDictionary = DATAIDToDATAParticleDictionary()
let accessQueue = DispatchQueue(label: "blarg2",
qos: .userInitiated,
attributes: .concurrent)
var getDictionary: DATAIDToDATAParticleDictionary {
get {
var dictionaryCopy: DATAIDToDATAParticleDictionary!
accessQueue.sync {
dictionaryCopy = unsafeDictionary
}
return dictionaryCopy
}
}
func addToDATAIDToCVPDict(key: String, value: DATAParticle) {
accessQueue.async(flags: .barrier) { [weak self] in
guard let self = self else { return }
self.unsafeDictionary[key] = value
}
}
func clearDictionary() {
accessQueue.async(flags: .barrier) { [weak self] in
guard let self = self else { return }
self.unsafeDictionary.removeAll()
}
}
}

你说:

我正在执行下面代码中的操作,但当notify()块结束时,resultDictionary似乎丢失了所有数据。因此,我得到的只是函数返回的一个空字典。

问题是您试图return一个异步计算的值。您可能希望转换到完成块模式。

顺便说一句,调度组是不必要的。具有讽刺意味的是,concurrentPerform是同步的(即,它直到并行化的for循环结束才继续)。因此,如果你知道在所有迭代完成之前,你不会到达concurrentPerform之后的行,那么使用notify是没有意义的。

我还建议不要让concurrentPerform循环更新属性。它会让你面临各种各样的问题。例如,如果主线程同时与该对象交互,该怎么办?当然,您可以同步您的访问,但它可能不完整。让它只更新局部变量,并让调用方在其完成处理程序块中进行属性更新,这可能更安全。显然,您可以继续更新属性(特别是如果您想更新UI以反映运行中的进度),但这会给代码增加一个可能不必要的额外褶皱。下面,我认为没有必要。

此外,虽然我理解所有这些[weak self]引用背后的意图,但它们确实不需要,尤其是在同步类DATASynchronizedIDToParticleDictionary中。我们经常使用weak引用来避免强引用循环。但如果你没有强有力的推荐信,它们只会增加开销,除非你有其他迫切的需求。

好的,让我们深入研究一下代码。

  • 首先,我将使用通用泛型来退役专用DATASynchronizedIDToParticleDictionary

    class SynchronizedDictionary<Key: Hashable, Value> {
    private var _dictionary: [Key: Value]
    private let queue = DispatchQueue(label: Bundle.main.bundleIdentifier! + ".dictionary", qos: .userInitiated, attributes: .concurrent)
    init(_ dictionary: [Key: Value] = [:]) {
    _dictionary = dictionary
    }
    var dictionary: [Key: Value] {
    queue.sync { _dictionary }
    }
    subscript(key: Key) -> Value? {
    get { queue.sync                   { _dictionary[key] } }
    set { queue.async(flags: .barrier) { self._dictionary[key] = newValue } }
    }
    func removeAll() {
    queue.async(flags: .barrier) {
    self._dictionary.removeAll()
    }
    }
    }
    

    注意,我已经删除了不必要的weak引用。我还用一个更自然的下标运算符和一个更紧密地反映底层Dictionary类型接口的removeAll方法重命名了addToDATAIDToCVPDictclearDictionary。它产生了看起来更自然的代码。(因为这是通用的,我们可以将它用于任何需要这种低级别同步的字典。)

    无论如何,您现在可以声明字典的同步格式,如下所示:

    let particles = SynchronizedDictionary(dataIDToSRLParticleDictionary())
    

    当我想用一些值更新字典时,你可以这样做:

    particles[data.ID] = theCVParticle
    

    当我想检索实际的底层包装字典时,我可以这样做:

    let finalResult = particles.dictionary
    
  • 当我们这样做的时候,由于我们可能想跟踪需要同步的错误数组,我可能会添加一个数组等效类型:

    class SynchronizedArray<Value> {
    private var _array: [Value]
    private let queue = DispatchQueue(label: Bundle.main.bundleIdentifier! + ".array", qos: .userInitiated, attributes: .concurrent)
    init(_ dictionary: [Value] = []) {
    _array = dictionary
    }
    var array: [Value] {
    queue.sync { _array }
    }
    subscript(index: Int) -> Value {
    get { queue.sync                   { _array[index] } }
    set { queue.async(flags: .barrier) { self._array[index] = newValue } }
    }
    func append(_ value: Value) {
    queue.async(flags: .barrier) {
    self._array.append(value)
    }
    }
    func removeAll() {
    queue.async(flags: .barrier) {
    self._array.removeAll()
    }
    }
    }
    
  • 我们现在可以把注意力转向主要的程序。因此,与其返回一个值,不如给它一个@escaping完成处理程序。而且,如上所述,我们将取消不必要的调度组:

    func calculateAllClosestParticles(completion: @escaping ([String: CVParticle], [Error]) -> Void) {
    let queue = DispatchQueue(label: "blarg", qos: .userInitiated, attributes: .concurrent)
    let particles = SynchronizedDictionary(dataIDToSRLParticleDictionary())
    let errors = SynchronizedArray<Error>()
    queue.async {
    DispatchQueue.concurrentPerform(iterations: self.dataArray.count) { index in
    let data = self.dataArray[index]
    let result = data.runPartcleFilterForClosestParticleAndMaybeStopAudio()
    switch result {
    case .success(let cvParticle):
    // If there was a CVP found, add it to the set.
    if let cvParticle = cvParticle {
    particles[data.ID] = cvParticle
    }
    case .failure(let error):
    errors.append(error)
    }
    }
    DispatchQueue.main.async {
    completion(particles.dictionary, errors.array)
    }
    }
    }
    

    现在,我不知道字典的正确类型是什么,所以您可能需要调整completion的参数。你没有提供其余的程序,所以我可能在这里有一些细节错误。但不要迷失在细节中,只需注意小心避免concurrentPerform中的属性,并将结果传递回完成处理程序。

    你可以这样称呼它:

    calculateAllClosestParticles { dictionary, errors in
    guard errors.isEmpty else { return }
    // you can access the dictionary and updating the model and UI here
    self.someProperty = dictionary
    self.tableView.reloadData()
    }
    // but don't try to access the dictionary here, because the asynchronous code hasn't finished yet
    //
    
  • FWIW,虽然我使用了您在示例中使用的读写器模式,但根据我的经验,NSLock实际上更适合快速同步,尤其是当您使用concurrentPerform时,它可能会占用CPU上的所有核心,例如

    class SynchronizedDictionary<Key: Hashable, Value> {
    private var _dictionary: [Key: Value]
    private let lock = NSLock()
    init(_ dictionary: [Key: Value] = [:]) {
    _dictionary = dictionary
    }
    var dictionary: [Key: Value] {
    lock.synchronized { _dictionary }
    }
    subscript(key: Key) -> Value? {
    get { lock.synchronized { _dictionary[key] } }
    set { lock.synchronized { _dictionary[key] = newValue } }
    }
    func removeAll() {
    lock.synchronized {
    _dictionary.removeAll()
    }
    }
    }
    

    何处

    extension NSLocking {
    func synchronized<T>(_ closure: () throws -> T) rethrows -> T {
    lock()
    defer { unlock() }
    return try closure()
    }
    }
    

    最重要的是,如果不需要的话,你不想强制上下文切换进行同步

  • 在执行并发执行时,如果有许多dataPoints,并且每次调用runPartcleFilterForClosestParticleAndMaybeStopAudio所需的时间适中,则可能需要考虑"跨步",在每次迭代中执行多个数据点。这超出了这个问题的范围,但仅供参考。

不太确定我做了什么,但我移动了

resultDictionary = self.dataIDsToCVPDictionary.getDictionary

在第一个异步块之外,并且似乎允许为函数返回保留/保留数据。

相关内容

  • 没有找到相关文章

最新更新