import Dispatch
class SynchronizedArray<T> {
private var array: [T] = []
private let accessQueue = DispatchQueue(label: "SynchronizedArrayAccess", attributes: .concurrent)
var get: [T] {
accessQueue.sync {
array
}
}
func append(newElement: T) {
accessQueue.async(flags: .barrier) {
self.array.append(newElement)
}
}
}
如果我运行以下代码,即使我同时读取,10000个元素也会按预期附加到数组中:
DispatchQueue.concurrentPerform(iterations: 10000) { i in
_ threadSafeArray.get
threadSafeArray.append(newElement: i)
}
但当我这样做的时候,只有它从未接近于添加10000个元素(上次我运行它时,我的计算机上只添加了92个元素(。
let concurrent = DispatchQueue(label: "com.concurrent", attributes: .concurrent)
for i in 0..<10000 {
concurrent.async {
_ = threadSafeArray.get
threadSafeArray.append(newElement: i)
}
}
为什么前者有效,为什么后者无效?
很高兴您找到了线程爆炸的解决方案。请参阅关于线程爆炸的讨论WWDC 2015用GCD构建响应性和高效的应用程序,以及Swift 3中的WWDC 2016用GCD并发编程。
话虽如此,如今,在存在concurrentPerform
(或OperationQueue
及其maxConcurrentOperationCount
或Combine及其maxPublishers
(的情况下,DispatchSemaphore
有点反模式。所有这些都比调度信号量更优雅地管理并发程度。
所有这些都已经说过了,对你的信号量模式的一些观察:
-
使用此
DispatchSemaphore
模式时,通常将wait
放在concurrent.async { ... }
之前(因为,正如所写的,您得到的是九个并发操作,而不是八个,这有点误导(。 -
这里更深层次的问题是,你已经减少了计数问题,但它仍然存在。考虑:
let threadSafeArray = SynchronizedArray<Int>() let concurrent = DispatchQueue(label: "com.concurrent", attributes: .concurrent) let semaphore = DispatchSemaphore(value: 8) for i in 0..<10000 { semaphore.wait() concurrent.async { threadSafeArray.append(newElement: i) semaphore.signal() } } print(threadSafeArray.get.count)
当您离开
for
循环时,您仍然可以在concurrent
上有多达八个异步任务仍在运行,并且count
(相对于concurrent
队列未同步(仍然可以小于10000。您必须添加另一个concurrent.async(flags: .barrier) { ... }
,这只是添加了第二层同步。例如let semaphore = DispatchSemaphore(value: 8) for i in 0..<10000 { semaphore.wait() concurrent.async { threadSafeArray.append(newElement: i) semaphore.signal() } } concurrent.async(flags: .barrier) { print(threadSafeArray.get.count) }
或者,您可以使用
DispatchGroup
,这是用于确定一系列异步调度块何时完成的经典机制:let semaphore = DispatchSemaphore(value: 8) let group = DispatchGroup() for i in 0..<10000 { semaphore.wait() concurrent.async(group: group) { threadSafeArray.append(newElement: i) semaphore.signal() } } group.notify(queue: .main) { print(threadSafeArray.get.count) }
concurrentPerform
的使用消除了对这两种模式的任何一种的需要,因为在所有并发任务完成之前,它不会继续执行。(它还将根据设备上的核心数量自动优化并发程度。( -
FWIW,
SynchronizedArray
的一个更好的替代方案是根本不公开底层数组,只实现任何您想要公开的方法,集成必要的同步。它使呼叫站点更干净,并解决了许多问题。例如,假设您想公开下标运算符和
count
变量,您可以执行以下操作:class SynchronizedArray<T> { private var array: [T] private let accessQueue = DispatchQueue(label: "com.domain.app.reader-writer", attributes: .concurrent) init(_ array: [T] = []) { self.array = array } subscript(index: Int) -> T { get { reader { $0[index] } } set { writer { $0[index] = newValue } } } var count: Int { reader { $0.count } } func append(newElement: T) { writer { $0.append(newElement) } } func reader<U>(_ block: ([T]) throws -> U) rethrows -> U { try accessQueue.sync { try block(array) } } func writer(_ block: @escaping (inout [T]) -> Void) { accessQueue.async(flags: .barrier) { block(&self.array) } } }
这解决了各种问题。例如,您现在可以执行以下操作:
print(threadSafeArray.count) // get the count print(threadSafeArray[500]) // get the 500th item
你现在也可以做一些事情,比如:
let average = threadSafeArray.reader { array -> Double in let sum = array.reduce(0, +) return Double(sum) / Double(array.count) }
但是,归根结底,在处理集合(或任何可变对象(时,您总是不想公开可变对象本身,而是为常见操作(下标、
count
、removeAll
等(编写自己的同步方法,并可能为应用程序开发人员可能需要更广泛同步机制的情况公开读取器/写入器接口。(FWIW,对
SynchronizedArray
的更改同时适用于信号量或concurrentPerform
场景;只是信号量恰好在这种情况下显示了问题。( -
不用说,通常在每个线程上也会有更多的工作要做,因为尽管上下文切换开销不大,但在这里它可能足以抵消从并行处理中获得的任何优势。(但我知道这可能只是一个问题的概念演示,而不是一个拟议的实施。(只是给未来读者的一个参考。
我似乎遇到了线程爆炸,因为创建了82个线程,应用程序的线程用完了,我使用的解决方案是一个信号量来限制线程数量:
let semaphore = DispatchSemaphore(value: 8)
let concurrent = DispatchQueue(label: "com.concurrent", attributes: .concurrent)
for i in 0..<10000 {
concurrent.async {
_ = threadSafeArray.get
threadSafeArray.append(newElement: i)
semaphore.signal()
}
semaphore.wait()
}
编辑:Rob的回答解释了上述代码的一些问题