如何在队列中实现并发



我正在尝试实现一个应该同步的队列。要求是:

  1. 如果队列为空并且有新数据出现,则应将数据插入队列中,并且只要队列不变为空,就应开始数据处理。
  2. 如果 Queue 不为空,则表示队列中的数据仍在处理中。因此,如果出现一些新数据,则应将数据添加到队列中。

这是我的实现:

class QueueManager {
private var jobsQueue: LinkedList<String> = LinkedList()
fun handleIncomingJob() {
if (isEmpty()) {
addJ(jobList)
start()
} else {
addJ(jobList)
}
}
private fun isEmpty(): Boolean {
synchronized(this@QueueManager) {
val ret = jobsQueue.isEmpty()
return ret
}
}
private fun addJ(jobList: Array<SAPJob>) {
synchronized(this@QueueManager) {
jobsQueue.add(jobList)
}
}
private fun remove(): Array<SAPJob> {
synchronized(this@QueueManager) {
return jobsQueue.remove()
}
}
fun start() {
while (!isEmpty()) {
Thread.sleep(10000)
remove()
}
}
}

但是上面的代码不起作用。 当多个线程同时调用handleIncomingJob方法并且所有线程都发现队列为空时,它会失败。根据我的要求,只有第一个线程应该发现队列为空,向其添加数据并开始处理.并且所有其他线程应该发现,因为第一个线程将项目放入队列中,因此它不为空,因此他们应该将数据放入队列并退出。 但是根据上面的实现,所有线程都发现队列为空。我不知道是什么问题。我已经将所有队列方法同步。仍然不起作用

您的同步是多个函数的本地同步,但其中两个或多个函数构成了一个关键部分。换句话说,您不会在函数调用之间保持锁,这意味着线程可以读取不一致的状态。例如,processIncomingJob()函数中可能发生以下情况:

  1. 线程A调用isEmpty(),结果是true
  2. 线程A移动到if块中。
  3. 在线程A可以调用另一个线程之前addJ(...)B调用isEmpty(),并且由于线程A尚未实际添加作业,因此结果也是true
  4. 线程B移动到if块中。
  5. 最终,线程A和线程B都调用start()函数,从而导致您的问题。

使代码线程安全时,必须将整个工作单元封装在同步上下文中。"工作单元"的实际包含内容完全取决于代码的业务逻辑。请注意,您的start()函数也遇到了同样的问题。

您的代码示例还存在其他问题:

  • 您的jobsQueue被定义为LinkedList<String>但在其他地方似乎被视为LinkedList<Array<SAPJob>>
  • 什么是SAPJob
  • handleIncomingJob()的背景下,什么是jobList
  • start()里面,你打电话给remove()但对结果什么都不做。
  • 为什么start()公开?

由于这些问题,要完全理解您希望在代码中发生什么并不容易。但是,根据您对意图和问题的描述,我相信以下示例可以帮助您找到用例的解决方案:

import java.util.LinkedList
import java.util.Queue
typealias Job = () -> Unit
class JobProcessor {
private val lock = Any()
private val queue: Queue<Job> = LinkedList()
private var isExecuting = false
fun processJob(job: Job) {
val execute = synchronized(lock) {
queue.add(job)
!isExecuting.also { isExecuting = true }
}
if (execute) {
executeQueuedJobs()
}
}
private fun executeQueuedJobs() {
var keepExecuting: Boolean
do {
val job = synchronized(lock) { queue.remove() }
try {
job()
} catch (ex: Exception) {
Thread.currentThread().uncaughtExceptionHandler
.uncaughtException(Thread.currentThread(), ex)
}
keepExecuting = synchronized(lock) {
queue.isNotEmpty().also { isExecuting = it }
}
} while (keepExecuting)
}
}

将作业添加到队列时,添加作业的线程将仅执行作业,而当前没有其他线程正在执行以前的作业。执行作业的线程将继续这样做,直到当前作业完成后队列为空。请注意,在不保持锁定的情况下执行作业,以便允许其他作业在一个作业执行时排队。isExecuting的目的是处理线程正在执行作业但队列为空的情况。

最新更新