如果每个线程都需要恢复工作,每当任何线程找到一些新信息时,如何等到所有线程完成



一个看似简单的同步问题

TL;博士

多个线程相互依赖。每当其中一个发现一些新信息时,他们都需要处理这些信息。如何确定所有线程都已准备就绪?

背景

我已经(几乎)并行化了一个解决问题的函数Foo(input),已知它是 P 完备的,可以被认为是某种类型的搜索。不出所料,到目前为止,还没有人设法成功地利用两个线程以外的并行性来解决这个问题。然而,我有一个很有前途的想法,并设法完全实现了它,除了这个看似简单的问题。

每个线程之间的信息是使用某种类似图的G类型的共享数据库g隐式交换的,这样线程立即拥有所有信息,并且实际上不需要明确地相互通知。更准确地说,每次某个线程找到信息i时,该线程都会调用一个线程安全函数g.addInformation(i)该函数,该函数基本上将信息i放在某个数组的末尾。我的新实现的一个方面是,线程甚至可以在搜索期间使用信息i,甚至在数组末尾排队之前i。但是,每个线程在该数组中排队后,还需要另外处理i的信息。排队i可能会在添加i的线程从g.addInformation(i)返回后发生。这是因为其他线程可能会接管将i排队的责任。

每个线程s调用一个函数s.ProcessAllInformation(),以便按顺序g处理该数组中的所有信息。某个线程对s.ProcessAllInformation的调用是一个noop,即如果该线程已经处理了所有信息或没有(新)信息,则什么都不做。

一旦线程处理完所有信息,它就应该等待所有其他线程完成。如果任何其他线程i找到一些新信息,它应该恢复工作。即,每次某些线程调用g.addInformation(i)所有已完成处理所有先前已知信息的线程时,都需要恢复其工作并处理该(以及任何其他)新添加的信息。

我的问题

我认为的任何解决方案都不起作用,并且存在相同问题的变体:一个线程完成了所有信息的处理,然后看到所有其他线程也准备就绪。因此,此线程离开。但随后另一个线程注意到添加了一些新信息,恢复工作并找到新信息。然后,已经离开的线程不会处理新信息。

这个问题的解决方案可能很简单,但我想不出一个。理想情况下,此问题的解决方案不应依赖于函数调用期间的耗时操作,以便在发现新信息时g.addInformation(i),因为预计这种情况每秒会出现多少次(每秒 1 或 200 万次,见下文)。

更多背景

在我的初始顺序应用程序中,函数Foo(input)在现代硬件上大约每秒调用 100k 次,我的应用程序花费 80% 到 90% 的时间执行Foo(input)。实际上,所有对Foo(input)函数的调用都是相互依赖的,我们以一种迭代的方式在一个非常大的空间中搜索一些东西。使用应用程序的顺序版本时,解决合理大小的问题通常需要大约一两个小时。

每次调用Foo(input)时,都会发现零到数百个新信息。在我的应用程序执行期间,平均每秒发现 100 或 200 万条信息,即我们在每次调用Foo(input)的函数时找到 10 到 20 条新信息。所有这些统计数据可能都有一个非常高的标准差(不过我还没有测量

)。目前,我正在为 Go 中的Foo(input)并行版本编写原型。我更喜欢围棋中的答案。顺序应用程序是用 C 编写的(实际上它是C++的,但它写得像 C 中的程序)。所以 C 或 C++(或伪代码)的答案是没有问题的。我还没有对我的原型进行基准测试,因为错误的代码比慢代码慢得多。

法典

此代码示例是为了阐明。由于我还没有解决问题,请随时考虑对代码进行任何更改。(我也欣赏不相关的有用评论。

全球形势

我们有某种类型的GFoo()是一种G的方法。如果gG类型的对象,并且当调用g.Foo(input)时,g会创建一些获取指向g指针的s[1]、 ...、s[g.numThreads]的工作线程,以便这些工作线程可以访问g的成员变量,并且能够在找到新信息时调用g.addInformation(i)。然后,对于每个工作线程s[j]并行调用FooInParallel()的方法。

type G struct {
s           []worker
numThreads  int
// some data, that the workers need access to
}
func (g *G) initializeWith(input InputType) {
// Some code...
}
func (g *G) Foo(input InputType) int {
// Initialize data-structures:
g.initializeWith(input)
// Initialize workers:
g.s := make([]worker, g.numThreads)
for j := range g.s {
g.s[j] := newWorker(g) // workers get a pointer to g
}
// Note: This wait group doesn't solve the problem. See remark below.
wg := new(sync.WaitGroup)
wg.Add(g.numThreads)

// Actual computation in parallel:
for j := 0 ; j < g.numThreads - 1 ; j++ {
// Start g.numThread - 1 go-routines in parrallel
go g.s[j].FooInParallel(wg)
}
// Last thread is this go-routine, such that we have
// g.numThread go-routines in total.
g.s[g.numThread-1].FooInParallel(wg)
wg.Wait()
}
// This function is thread-safe in so far as several
// workers can concurrently add information.
// 
// The function is optimized for heavy contention; most
// threads can leave almost immediately. One threads 
// cleans up any mess they leave behind (and even in 
// bad cases that is not too much).
func (g *G) addInformation(i infoType) {
// Step 1: Make information available to all threads.
// Step 2: Enqueue information at the end of some array.
// Step 3: Possibly, call g.notifyAll()
}
// If a new information has been added, we must ensure, 
// that every thread, that had finished, resumes work 
// and processes any newly added informations. 
func (g *G) notifyAll() {
// TODO:
// This is what I fail to accomplish. I include
// my most successful attempt in the corresponding.
// section. It doesn't work, though.
}
// If a thread has finished processing all information
// it must ensure that all threads have finished and
// that no new information have been added since.
func (g *G) allThreadsReady() bool {
// TODO:
// This is what I fail to accomplish. I include
// my most successful attempt in the corresponding.
// section. It doesn't work, though.
}

备注:等待组的唯一目的是确保在最后一个工作人员返回之前不会再次调用Foo(input)。但是,您可以完全忽略这一点。

当地情况

每个工作线程都包含一个指向全局数据结构的指针,并搜索宝藏或新信息,直到它处理了此线程或其他线程排队的所有信息。如果它找到一个新信息i它会调用函数g.addInformation(i)并继续搜索。如果它找到宝藏,它就会通过它获得的作为论据的渠道发送宝藏并返回。如果所有线程都准备好处理所有信息,则每个线程都可以向通道发送一个虚拟宝藏并返回。但是,确定所有线程是否都准备就绪正是我的问题。

type worker struct {
// Each worker contains a pointer to g
// such that it has access to its member
// variables and is able to call the
// function g.addInformation(i) as soon 
// as it finds some information i.
g    *G 
// Also contains some other stuff. 
}
func (s *worker) FooInParallel(wg *sync.WaitGroup) {
defer wg.Done()
for {
a := s.processAllInformation()

// The following is the problem. Feel free to make any 
// changes to the following block.
s.notifyAll()
for !s.needsToResumeWork() {
if s.allThreadsReady() {
return
}
}
}
}
func (s *worker) notifyAll() {
// TODO:
// This is what I fail to accomplish. I include
// my most successful attempt in the corresponding.
// section. It doesn't work, though.
// An example: 
// Step 1: Possibly, do something else first.
// Step 2: Call g.notifyAll()
}
func (s *worker) needsToResumeWork() bool {
// TODO:
// This is what I fail to accomplish. I include
// my most successful attempt in the corresponding.
// section. It doesn't work, though.
}
func (s *worker) allThreadsReady() bool {
// TODO:
// This is what I fail to accomplish. I include
// my most successful attempt in the corresponding.
// section. It doesn't work, though.
// If all threads are ready, return true. 
// Otherwise, return false.
// Alternatively, spin as long as no new information
// has been added, and return false as soon as some
// new information has been added, or true if no new
// information has been added and all other threads
// are ready.
// 
// However, this doesn't really matter, because a 
// function call to processAllInformation is cheap
// if no new informations are available.
}
// A call to this function is cheap if no new work has
// been added since the last function call.
func (s *worker) processAllInformation() treasureType {
// Access member variables of g and search
// for information or treasures. 
// If a new information i is found, calls the
// function g.addInformation(i).
// If all information that have been enqueued to
// g have been processed by this thread, returns.
}

我解决问题的最佳尝试

好吧,到现在为止,我已经很累了,所以我可能需要稍后仔细检查我的解决方案。但是,即使是我的correct尝试也不起作用。因此,为了让您了解我迄今为止一直在尝试的内容(以及许多其他事情),我立即分享。

我尝试了以下方法。每个工作线程都包含一个成员变量needsToResumeWork,每当添加新信息时,该变量就以原子方式设置为 1。多次将此成员变量设置为 1 不会造成伤害,重要的是线程在添加最后一个信息后恢复工作。

为了减少调用g.addInformation(i)的线程的工作量,每当找到信息i时,对信息进行排队的线程(不一定是调用g.addInformation(i)的线程)之后将成员变量notifyAllFlagg设置为 1,这表示需要通知所有线程有关最新信息的信息。

每当一个线程处理完所有排队的信息调用函数g.notifyAll()时,它就会检查成员变量notifyAllFlag是否设置为 1。如果是这样,它会尝试以原子方式将g.allInformedFlag与 1 进行比较,并与 0 交换。如果它无法写入g.allInformedFlag则假定其他线程已承担通知所有线程的责任。如果此操作成功,则此线程已接管通知所有线程的责任,并通过为每个线程将成员变量needsToResumeWorkFlag设置为 1 来继续执行此操作。之后,它以原子方式将g.numThreadsReadyg.notifyAllFlag设置为零,g.allInformedFlag设置为 1。

type G struct {
numThreads       int
numThreadsReady      *uint32 // initialize to 0 somewhere appropriate
notifyAllFlag        *uint32 // initialize to 0 somewhere appropriate
allInformedFlag      *uint32 // initialize to 1 somewhere appropriate (1 is not a typo)
// some data, that the workers need access to
}
// This function is thread-safe in so far as several
// workers can concurrently add information.
// 
// The function is optimized for heavy contention; most
// threads can leave almost immediately. One threads 
// cleans up any mess they leave behind (and even in 
// bad cases that is not too much).
func (g *G) addInformation(i infoType) {
// Step 1: Make information available to all threads.
// Step 2: Enqueue information at the end of some array.
// Since the responsibility to enqueue an information may
// be passed to another thread, it is important that the
// last step is executed by the thread which enqueues the 
// information(s) in order to ensure, that the information
// successfully has been enqueued.
// Step 3:
atomic.StoreUint32(g.notifyAllFlag,1)        // all threads need to be notified
}
// If a new information has been added, we must ensure, 
// that every thread, that had finished, resumes work 
// and processes any newly added informations. 
func (g *G) notifyAll() {
if atomic.LoadUint32(g.notifyAll) == 1 {
// Somebody needs to notify all threads.
if atomic.CompareAndSwapUint32(g.allInformedFlag, 1, 0) {
// This thread has taken over the responsibility to inform
// all other threads. All threads are hindered to access 
// their member variable s.needsToResumeWorkFlag
for j := range g.s {
atomic.StoreUint32(g.s[j].needsToResumeWorkFlag, 1)
}
atomic.StoreUint32(g.notifyAllFlag, 0)
atomic.StoreUint32(g.numThreadsReady, 0)
atomic.StoreUint32(g.allInformedFlag, 1)
} else {
// Some other thread has taken responsibility to inform
// all threads. 
}
}

每当线程完成处理完排队的所有信息时,它都会通过原子方式将其成员变量needsToResumeWorkFlag与 1 进行比较并与 0 交换来检查它是否需要恢复工作。但是,由于其中一个线程负责通知所有其他线程,因此它不能立即这样做。

首先,它必须调用函数g.notifyAll(),然后它必须检查要调用的最新线程是否g.notifyAll()完成通知所有线程。因此,在调用g.notifyAll()之后,它必须旋转直到g.allInformed为 1,然后检查其成员变量s.needsToResumeWorkFlag是否为 1,在这种情况下,原子方式将其设置为 0 并恢复工作。(我想这是一个错误,但我也在这里尝试了其他几件事,但没有成功。如果s.needsToResumeWorkFlag已经为零,则如果以前没有这样做过,它会原子地递增g.numThreadsReady1。(回想一下,g.numThreadsReady在函数调用g.notifyAll()期间被重置。然后它原子地检查g.numThreadsReady是否等于g.numThreads,在这种情况下它可以离开(在向通道发送假宝之后)。否则,我们将重新开始,直到此线程已收到通知(可能由其本身)或所有线程都准备就绪。

type worker struct {
// Each worker contains a pointer to g
// such that it has access to its member
// variables and is able to call the
// function g.addInformation(i) as soon 
// as it finds some information i.
g    *G 
// If new work has been added, the thread
// is notified by setting the uint32 
// at which needsToResumeWorkFlag points to 1.
needsToResumeWorkFlag *uint32 // initialize to 0 somewhere appropriate
// Also contains some other stuff. 
}
func (s *worker) FooInParallel(wg *sync.WaitGroup) {
defer wg.Done()
for {
a := s.processAllInformation()
numReadyIncremented := false
for !s.needsToResumeWork() {
if !numReadyIncremented {
atomic.AddUint32(g.numThreadsReady,1)
numReadyIncremented = true
}
if s.allThreadsReady() {
return
}
}
}
}
func (s *worker) needsToResumeWork() bool {
s.notifyAll()
for {
if atomic.LoadUint32(g.allInformedFlag) == 1 {
if atomic.CompareAndSwapUint32(s.needsToResumeWorkFlag, 1, 0) {
return true
} else {
return false
}
}
}
}
func (s *worker) notifyAll() {
g.notifyAll()
}
func (g *G) allThreadsReady() bool {
if atomic.LoadUint32(g.numThreadsReady) == g.numThreads {
return true
} else {
return false
}
}

如前所述,我的解决方案不起作用。

我自己找到了解决方案。我们利用,如果没有添加新信息(并且很便宜),对s.processAllInformation()的调用将无济于事。诀窍是使用原子变量作为两者的锁,让每个线程在必要时通知所有线程并检查是否已通知它。然后简单地再次调用s.processAllInformation(),如果无法获得锁。然后,线程使用通知来检查它是否必须增加就绪线程的计数器,而不是查看它是否需要返回工作。

全球形势

type G struct {
numThreads           int
numThreadsReady      *uint32 // initialize to 0 somewhere appropriate
notifyAllFlag        *uint32 // initialize to 0 somewhere appropriate
allCanGoFlag         *uint32 // initialize to 0 somewhere appropriate
lock                 *uint32 // initialize to 0 somewhere appropriate
// some data, that the workers need access to
}
// This function is thread-safe in so far as several
// workers can concurrently add information.
// 
// The function is optimized for heavy contention; most
// threads can leave almost immediately. One threads 
// cleans up any mess they leave behind (and even in 
// bad cases that is not too much).
func (g *G) addInformation(i infoType) {
// Step 1: Make information available to all threads.
// Step 2: Enqueue information at the end of some array.
// Since the responsibility to enqueue an information may
// be passed to another thread, it is important that the
// last step is executed by the thread which enqueues the 
// information(s) in order to ensure, that the information
// successfully has been enqueued.
// Step 3:
atomic.StoreUint32(g.notifyAllFlag,1)        // all threads need to be notified
}
// If a new information has been added, we must ensure, 
// that every thread, that had finished, resumes work 
// and processes any newly added informations. 
//
// This function is not thread-safe. Make sure not to 
// have several threads call this function concurrently
// if these calls are not guarded by some lock.
func (g *G) notifyAll() {
if atomic.LoadUint32(g.notifyAllFlag,1) {    
for j := range g.s {
atomic.StoreUint32(g.s[j].needsToResumeWorkFlag, 1)
}
atomic.StoreUint32(g.notifyAllFlag,0)
atomic.StoreUint32(g.numThreadsReady,0)
}

当地情况

type worker struct {
// Each worker contains a pointer to g
// such that it has access to its member
// variables and is able to call the
// function g.addInformation(i) as soon 
// as it finds some information i.
g    *G 
// If new work has been added, the thread
// is notified by setting the uint32 
// at which needsToResumeWorkFlag points to 1.
needsToResumeWorkFlag *uint32 // initialize to 0 somewhere appropriate
incrementedNumReadyFlag *uint32 // initialize to 0 somewhere appropriate
// Also contains some other stuff. 
}
func (s *worker) FooInParallel(wg *sync.WaitGroup) {
defer wg.Done()
for {
a := s.processAllInformation()
if atomic.LoadUint32(s.g.allCanGoFlag, 1) {
return
}
if atomic.CompareAndSwapUint32(g.lock,0,1) { // If possible, lock.
s.g.notifyAll() // It is essential, that this is also guarded by the lock.
if atomic.LoadUint32(s.needsToResumeWorkFlag) == 1 {
atomic.StoreUint32(s.needsToResumeWorkFlag,0)
// Some new information was found, and this thread can't be sure,
// whether it already has processed it. Since the counter for
// how many threads are ready had been reset, we must increment
// that counter after the next call processAllInformation() in the 
// following iteration.
atomic.StoreUint32(s.incrementedNumReadyFlag,0)
} else {
// Increment number of ready threads by one, if this thread had not 
// done this before (since the last newly found information).
if atomic.CompareAndSwapUint32(s.incrementedNumReadyFlag,0,1) {
atomic.AddUint32(s.g.numThreadsReady,1)
}
// If all threads are ready, give them all a signal.
if atomic.LoadUint32(s.g.numThreadsReady) == s.g.numThreads {
atomic.StoreUint32(s.g.allCanGo, 1)
}
}
atomic.StoreUint32(g.lock,0) // Unlock.
}
}
}

稍后我可能会为线程添加一些顺序,以便在严重争用的情况下访问锁,但现在就可以了。

相关内容

  • 没有找到相关文章

最新更新